You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myriad.apache.org by jp...@apache.org on 2018/09/12 15:52:07 UTC
[11/26] incubator-myriad git commit: Upgrade mesos driver to Mesos
1.5 with protobuf 2.5
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/proto/mesos/v1/scheduler.proto
----------------------------------------------------------------------
diff --git a/myriad-commons/proto/mesos/v1/scheduler.proto b/myriad-commons/proto/mesos/v1/scheduler.proto
new file mode 100644
index 0000000..1fb0254
--- /dev/null
+++ b/myriad-commons/proto/mesos/v1/scheduler.proto
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+import "mesos/v1/mesos.proto";
+
+package mesos.v1.scheduler;
+
+option java_package = "org.apache.mesos.v1.scheduler";
+option java_outer_classname = "Protos";
+
+
+/**
+ * Scheduler event API.
+ *
+ * An event is described using the standard protocol buffer "union"
+ * trick, see:
+ * https://developers.google.com/protocol-buffers/docs/techniques#union.
+ */
+message Event {
+ // Possible event types, followed by message definitions if
+ // applicable.
+ enum Type {
+ // This must be the first enum value in this list, to
+ // ensure that if 'type' is not set, the default value
+ // is UNKNOWN. This enables enum values to be added
+ // in a backwards-compatible way. See: MESOS-4997.
+ UNKNOWN = 0;
+
+ SUBSCRIBED = 1; // See 'Subscribed' below.
+ OFFERS = 2; // See 'Offers' below.
+ INVERSE_OFFERS = 9; // See 'InverseOffers' below.
+ RESCIND = 3; // See 'Rescind' below.
+ RESCIND_INVERSE_OFFER = 10; // See 'RescindInverseOffer' below.
+ UPDATE = 4; // See 'Update' below.
+ MESSAGE = 5; // See 'Message' below.
+ FAILURE = 6; // See 'Failure' below.
+ ERROR = 7; // See 'Error' below.
+
+ // Periodic message sent by the Mesos master according to
+ // 'Subscribed.heartbeat_interval_seconds'. If the scheduler does
+ // not receive any events (including heartbeats) for an extended
+ // period of time (e.g., 5 x heartbeat_interval_seconds), there is
+ // likely a network partition. In such a case the scheduler should
+ // close the existing subscription connection and resubscribe
+ // using a backoff strategy.
+ HEARTBEAT = 8;
+ }
+
+ // First event received when the scheduler subscribes.
+ message Subscribed {
+ required FrameworkID framework_id = 1;
+
+ // This value will be set if the master is sending heartbeats. See
+ // the comment above on 'HEARTBEAT' for more details.
+ optional double heartbeat_interval_seconds = 2;
+
+ // Since Mesos 1.1.
+ optional MasterInfo master_info = 3;
+ }
+
+ // Received whenever there are new resources that are offered to the
+ // scheduler. Each offer corresponds to a set of resources on an
+ // agent. Until the scheduler accepts or declines an offer the
+ // resources are considered allocated to the scheduler.
+ message Offers {
+ repeated Offer offers = 1;
+ }
+
+ // Received whenever there are resources requested back from the
+ // scheduler. Each inverse offer specifies the agent, and
+ // optionally specific resources. Accepting or Declining an inverse
+ // offer informs the allocator of the scheduler's ability to release
+ // the specified resources without violating an SLA. If no resources
+ // are specified then all resources on the agent are requested to be
+ // released.
+ message InverseOffers {
+ repeated InverseOffer inverse_offers = 1;
+ }
+
+ // Received when a particular offer is no longer valid (e.g., the
+ // agent corresponding to the offer has been removed) and hence
+ // needs to be rescinded. Any future calls ('Accept' / 'Decline') made
+ // by the scheduler regarding this offer will be invalid.
+ message Rescind {
+ required OfferID offer_id = 1;
+ }
+
+ // Received when a particular inverse offer is no longer valid
+ // (e.g., the agent corresponding to the offer has been removed)
+ // and hence needs to be rescinded. Any future calls ('Accept' /
+ // 'Decline') made by the scheduler regarding this inverse offer
+ // will be invalid.
+ message RescindInverseOffer {
+ required OfferID inverse_offer_id = 1;
+ }
+
+ // Received whenever there is a status update that is generated by
+ // the executor or agent or master. Status updates should be used by
+ // executors to reliably communicate the status of the tasks that
+ // they manage. It is crucial that a terminal update (see TaskState
+ // in v1/mesos.proto) is sent by the executor as soon as the task
+ // terminates, in order for Mesos to release the resources allocated
+ // to the task. It is also the responsibility of the scheduler to
+ // explicitly acknowledge the receipt of a status update. See
+ // 'Acknowledge' in the 'Call' section below for the semantics.
+ //
+ // A task status update may be used for guaranteed delivery of some
+ // task-related information, e.g., task's health update. Such
+ // information may be shadowed by subsequent task status updates, that
+ // do not preserve fields of the previously sent message.
+ message Update {
+ required TaskStatus status = 1;
+ }
+
+ // Received when a custom message generated by the executor is
+ // forwarded by the master. Note that this message is not
+ // interpreted by Mesos and is only forwarded (without reliability
+ // guarantees) to the scheduler. It is up to the executor to retry
+ // if the message is dropped for any reason.
+ message Message {
+ required AgentID agent_id = 1;
+ required ExecutorID executor_id = 2;
+ required bytes data = 3;
+ }
+
+ // Received when an agent is removed from the cluster (e.g., failed
+ // health checks) or when an executor is terminated. Note that, this
+ // event coincides with receipt of terminal UPDATE events for any
+ // active tasks belonging to the agent or executor and receipt of
+ // 'Rescind' events for any outstanding offers belonging to the
+ // agent. Note that there is no guaranteed order between the
+ // 'Failure', 'Update' and 'Rescind' events when an agent or executor
+ // is removed.
+ // TODO(vinod): Consider splitting the lost agent and terminated
+ // executor into separate events and ensure it's reliably generated.
+ message Failure {
+ optional AgentID agent_id = 1;
+
+ // If this was just a failure of an executor on an agent then
+ // 'executor_id' will be set and possibly 'status' (if we were
+ // able to determine the exit status).
+ optional ExecutorID executor_id = 2;
+
+ // On Posix, `status` corresponds to termination information in the
+ // `stat_loc` area returned from a `waitpid` call. On Windows, `status`
+ // is obtained via calling the `GetExitCodeProcess()` function. For
+ // messages coming from Posix agents, schedulers need to apply
+ // `WEXITSTATUS` family macros or equivalent transformations to obtain
+ // exit codes.
+ //
+ // TODO(alexr): Consider unifying Windows and Posix behavior by returning
+ // exit code here, see MESOS-7241.
+ optional int32 status = 3;
+ }
+
+ // Received when there is an unrecoverable error in the scheduler (e.g.,
+ // scheduler failed over, rate limiting, authorization errors etc.). The
+ // scheduler should abort on receiving this event.
+ message Error {
+ required string message = 1;
+ }
+
+ // Type of the event, indicates which optional field below should be
+ // present if that type has a nested message definition.
+ // Enum fields should be optional, see: MESOS-4997.
+ optional Type type = 1;
+
+ optional Subscribed subscribed = 2;
+ optional Offers offers = 3;
+ optional InverseOffers inverse_offers = 9;
+ optional Rescind rescind = 4;
+ optional RescindInverseOffer rescind_inverse_offer = 10;
+ optional Update update = 5;
+ optional Message message = 6;
+ optional Failure failure = 7;
+ optional Error error = 8;
+}
+
+
+/**
+ * Scheduler call API.
+ *
+ * Like Event, a Call is described using the standard protocol buffer
+ * "union" trick (see above).
+ */
+message Call {
+ // Possible call types, followed by message definitions if
+ // applicable.
+ enum Type {
+ // See comments above on `Event::Type` for more details on this enum value.
+ UNKNOWN = 0;
+
+ SUBSCRIBE = 1; // See 'Subscribe' below.
+ TEARDOWN = 2; // Shuts down all tasks/executors and removes framework.
+ ACCEPT = 3; // See 'Accept' below.
+ DECLINE = 4; // See 'Decline' below.
+ ACCEPT_INVERSE_OFFERS = 13; // See 'AcceptInverseOffers' below.
+ DECLINE_INVERSE_OFFERS = 14; // See 'DeclineInverseOffers' below.
+ REVIVE = 5; // Removes any previous filters set via ACCEPT or DECLINE.
+ KILL = 6; // See 'Kill' below.
+ SHUTDOWN = 7; // See 'Shutdown' below.
+ ACKNOWLEDGE = 8; // See 'Acknowledge' below.
+ RECONCILE = 9; // See 'Reconcile' below.
+ MESSAGE = 10; // See 'Message' below.
+ REQUEST = 11; // See 'Request' below.
+ SUPPRESS = 12; // Inform master to stop sending offers to the framework.
+
+ // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for
+ // already subscribed frameworks as a way of stopping offers from
+ // being generated and other events from being sent by the master.
+ // Note that this functionality existed originally to support
+ // SchedulerDriver::abort which was only necessary to handle
+ // exceptions getting thrown from within Scheduler callbacks,
+ // something that is not an issue with the Event/Call API.
+ }
+
+ // Subscribes the scheduler with the master to receive events. A
+ // scheduler must send other calls only after it has received the
+ // SUBCRIBED event.
+ message Subscribe {
+ // See the comments below on 'framework_id' on the semantics for
+ // 'framework_info.id'.
+ required FrameworkInfo framework_info = 1;
+
+ // List of suppressed roles for which the framework does not wish to be
+ // offered resources. The framework can decide to suppress all or a subset
+ // of roles the framework (re)registers as.
+ repeated string suppressed_roles = 2;
+ }
+
+ // Accepts an offer, performing the specified operations
+ // in a sequential manner.
+ //
+ // E.g. Launch a task with a newly reserved persistent volume:
+ //
+ // Accept {
+ // offer_ids: [ ... ]
+ // operations: [
+ // { type: RESERVE,
+ // reserve: { resources: [ disk(role):2 ] } }
+ // { type: CREATE,
+ // create: { volumes: [ disk(role):1+persistence ] } }
+ // { type: LAUNCH,
+ // launch: { task_infos ... disk(role):1;disk(role):1+persistence } }
+ // ]
+ // }
+ //
+ // Note that any of the offer’s resources not used in the 'Accept'
+ // call (e.g., to launch a task) are considered unused and might be
+ // reoffered to other frameworks. In other words, the same OfferID
+ // cannot be used in more than one 'Accept' call.
+ message Accept {
+ repeated OfferID offer_ids = 1;
+ repeated Offer.Operation operations = 2;
+ optional Filters filters = 3;
+ }
+
+ // Declines an offer, signaling the master to potentially reoffer
+ // the resources to a different framework. Note that this is same
+ // as sending an Accept call with no operations. See comments on
+ // top of 'Accept' for semantics.
+ message Decline {
+ repeated OfferID offer_ids = 1;
+ optional Filters filters = 2;
+ }
+
+ // Accepts an inverse offer. Inverse offers should only be accepted
+ // if the resources in the offer can be safely evacuated before the
+ // provided unavailability.
+ message AcceptInverseOffers {
+ repeated OfferID inverse_offer_ids = 1;
+ optional Filters filters = 2;
+ }
+
+ // Declines an inverse offer. Inverse offers should be declined if
+ // the resources in the offer might not be safely evacuated before
+ // the provided unavailability.
+ message DeclineInverseOffers {
+ repeated OfferID inverse_offer_ids = 1;
+ optional Filters filters = 2;
+ }
+
+ // Revive offers for the specified roles. If `roles` is empty,
+ // the `REVIVE` call will revive offers for all of the roles
+ // the framework is currently subscribed to.
+ message Revive {
+ repeated string roles = 1;
+ }
+
+ // Kills a specific task. If the scheduler has a custom executor,
+ // the kill is forwarded to the executor and it is up to the
+ // executor to kill the task and send a TASK_KILLED (or TASK_FAILED)
+ // update. Note that Mesos releases the resources for a task once it
+ // receives a terminal update (See TaskState in v1/mesos.proto) for
+ // it. If the task is unknown to the master, a TASK_LOST update is
+ // generated.
+ //
+ // If a task within a task group is killed before the group is
+ // delivered to the executor, all tasks in the task group are
+ // killed. When a task group has been delivered to the executor,
+ // it is up to the executor to decide how to deal with the kill.
+ // Note The default Mesos executor will currently kill all the
+ // tasks in the task group if it gets a kill for any task.
+ message Kill {
+ required TaskID task_id = 1;
+ optional AgentID agent_id = 2;
+
+ // If set, overrides any previously specified kill policy for this task.
+ // This includes 'TaskInfo.kill_policy' and 'Executor.kill.kill_policy'.
+ // Can be used to forcefully kill a task which is already being killed.
+ optional KillPolicy kill_policy = 3;
+ }
+
+ // Shuts down a custom executor. When the executor gets a shutdown
+ // event, it is expected to kill all its tasks (and send TASK_KILLED
+ // updates) and terminate. If the executor doesn’t terminate within
+ // a certain timeout (configurable via
+ // '--executor_shutdown_grace_period' agent flag), the agent will
+ // forcefully destroy the container (executor and its tasks) and
+ // transition its active tasks to TASK_LOST.
+ message Shutdown {
+ required ExecutorID executor_id = 1;
+ required AgentID agent_id = 2;
+ }
+
+ // Acknowledges the receipt of status update. Schedulers are
+ // responsible for explicitly acknowledging the receipt of status
+ // updates that have 'Update.status().uuid()' field set. Such status
+ // updates are retried by the agent until they are acknowledged by
+ // the scheduler.
+ message Acknowledge {
+ required AgentID agent_id = 1;
+ required TaskID task_id = 2;
+ required bytes uuid = 3;
+ }
+
+ // Allows the scheduler to query the status for non-terminal tasks.
+ // This causes the master to send back the latest task status for
+ // each task in 'tasks', if possible. Tasks that are no longer known
+ // will result in a TASK_LOST, TASK_UNKNOWN, or TASK_UNREACHABLE update.
+ // If 'tasks' is empty, then the master will send the latest status
+ // for each task currently known.
+ message Reconcile {
+ // TODO(vinod): Support arbitrary queries than just state of tasks.
+ message Task {
+ required TaskID task_id = 1;
+ optional AgentID agent_id = 2;
+ }
+
+ repeated Task tasks = 1;
+ }
+
+ // Sends arbitrary binary data to the executor. Note that Mesos
+ // neither interprets this data nor makes any guarantees about the
+ // delivery of this message to the executor.
+ message Message {
+ required AgentID agent_id = 1;
+ required ExecutorID executor_id = 2;
+ required bytes data = 3;
+ }
+
+ // Requests a specific set of resources from Mesos's allocator. If
+ // the allocator has support for this, corresponding offers will be
+ // sent asynchronously via the OFFERS event(s).
+ //
+ // NOTE: The built-in hierarchical allocator doesn't have support
+ // for this call and hence simply ignores it.
+ message Request {
+ repeated mesos.v1.Request requests = 1;
+ }
+
+ // Suppress offers for the specified roles. If `roles` is empty,
+ // the `SUPPRESS` call will suppress offers for all of the roles
+ // the framework is currently subscribed to.
+ message Suppress {
+ repeated string roles = 1;
+ }
+
+ // Identifies who generated this call. Master assigns a framework id
+ // when a new scheduler subscribes for the first time. Once assigned,
+ // the scheduler must set the 'framework_id' here and within its
+ // FrameworkInfo (in any further 'Subscribe' calls). This allows the
+ // master to identify a scheduler correctly across disconnections,
+ // failovers, etc.
+ optional FrameworkID framework_id = 1;
+
+ // Type of the call, indicates which optional field below should be
+ // present if that type has a nested message definition.
+ // See comments on `Event::Type` above on the reasoning behind this field being optional.
+ optional Type type = 2;
+
+ optional Subscribe subscribe = 3;
+ optional Accept accept = 4;
+ optional Decline decline = 5;
+ optional AcceptInverseOffers accept_inverse_offers = 13;
+ optional DeclineInverseOffers decline_inverse_offers = 14;
+ optional Revive revive = 15;
+ optional Kill kill = 6;
+ optional Shutdown shutdown = 7;
+ optional Acknowledge acknowledge = 8;
+ optional Reconcile reconcile = 9;
+ optional Message message = 10;
+ optional Request request = 11;
+ optional Suppress suppress = 16;
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/Executor.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/Executor.java b/myriad-commons/src/main/java/org/apache/mesos/Executor.java
new file mode 100644
index 0000000..095ca65
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/Executor.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos;
+
+import org.apache.mesos.Protos.*;
+
+/**
+ * Callback interface to be implemented by frameworks' executors.
+ * Note that only one callback will be invoked at a time, so it is not
+ * recommended that you block within a callback because it may cause a
+ * deadlock.
+ * <p>
+ * Each callback includes a reference to the executor driver that was
+ * used to run this executor. The reference will not change for the
+ * duration of an executor (i.e., from the point you do
+ * {@link ExecutorDriver#start} to the point that
+ * {@link ExecutorDriver#join} returns).
+ * This is intended for convenience so that an executor
+ * doesn't need to store a reference to the driver itself.
+ */
+public interface Executor {
+
+ /**
+ * Invoked once the executor driver has been able to successfully
+ * connect with Mesos. In particular, a scheduler can pass some
+ * data to its executors through the {@link ExecutorInfo#getData()}
+ * field.
+ *
+ * @param driver The executor driver that was registered and connected
+ * to the Mesos cluster.
+ * @param executorInfo Describes information about the executor that was
+ * registered.
+ * @param frameworkInfo Describes the framework that was registered.
+ * @param slaveInfo Describes the slave that will be used to launch
+ * the tasks for this executor.
+ *
+ * @see ExecutorDriver
+ * @see MesosSchedulerDriver
+ */
+ // TODO(vinod): Add a new reregistered callback for when the executor
+ // re-connects with a restarted slave.
+ void registered(ExecutorDriver driver,
+ ExecutorInfo executorInfo,
+ FrameworkInfo frameworkInfo,
+ SlaveInfo slaveInfo);
+
+ /**
+ * Invoked when the executor re-registers with a restarted slave.
+ *
+ * @param driver The executor driver that was re-registered with the
+ * Mesos master.
+ * @param slaveInfo Describes the slave that will be used to launch
+ * the tasks for this executor.
+ *
+ * @see ExecutorDriver
+ */
+ void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo);
+
+ /**
+ * Invoked when the executor becomes "disconnected" from the slave
+ * (e.g., the slave is being restarted due to an upgrade).
+ *
+ * @param driver The executor driver that was disconnected.
+ */
+ void disconnected(ExecutorDriver driver);
+
+ /**
+ * Invoked when a task has been launched on this executor (initiated
+ * via {@link SchedulerDriver#launchTasks}. Note that this task can be
+ * realized with a thread, a process, or some simple computation,
+ * however, no other callbacks will be invoked on this executor
+ * until this callback has returned.
+ *
+ * @param driver The executor driver that launched the task.
+ * @param task Describes the task that was launched.
+ *
+ * @see ExecutorDriver
+ * @see TaskInfo
+ */
+ void launchTask(ExecutorDriver driver, TaskInfo task);
+
+ /**
+ * Invoked when a task running within this executor has been killed
+ * (via {@link org.apache.mesos.SchedulerDriver#killTask}). Note that no
+ * status update will be sent on behalf of the executor, the executor is
+ * responsible for creating a new TaskStatus (i.e., with TASK_KILLED)
+ * and invoking {@link ExecutorDriver#sendStatusUpdate}.
+ *
+ * @param driver The executor driver that owned the task that was killed.
+ * @param taskId The ID of the task that was killed.
+ *
+ * @see ExecutorDriver
+ * @see TaskID
+ */
+ void killTask(ExecutorDriver driver, TaskID taskId);
+
+ /**
+ * Invoked when a framework message has arrived for this
+ * executor. These messages are best effort; do not expect a
+ * framework message to be retransmitted in any reliable fashion.
+ *
+ * @param driver The executor driver that received the message.
+ * @param data The message payload.
+ *
+ * @see ExecutorDriver
+ */
+ void frameworkMessage(ExecutorDriver driver, byte[] data);
+
+ /**
+ * Invoked when the executor should terminate all of its currently
+ * running tasks. Note that after Mesos has determined that an
+ * executor has terminated any tasks that the executor did not send
+ * terminal status updates for (e.g. TASK_KILLED, TASK_FINISHED,
+ * TASK_FAILED, etc) a TASK_LOST status update will be created.
+ *
+ * @param driver The executor driver that should terminate.
+ *
+ * @see ExecutorDriver
+ */
+ void shutdown(ExecutorDriver driver);
+
+ /**
+ * Invoked when a fatal error has occurred with the executor and/or
+ * executor driver. The driver will be aborted BEFORE invoking this
+ * callback.
+ *
+ * @param driver The executor driver that was aborted due this error.
+ * @param message The error message.
+ *
+ * @see ExecutorDriver
+ */
+ void error(ExecutorDriver driver, String message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/ExecutorDriver.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/ExecutorDriver.java b/myriad-commons/src/main/java/org/apache/mesos/ExecutorDriver.java
new file mode 100644
index 0000000..68d3ea9
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/ExecutorDriver.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos;
+
+import org.apache.mesos.Protos.*;
+
+/**
+ * Abstract interface for connecting an executor to Mesos. This
+ * interface is used both to manage the executor's lifecycle (start
+ * it, stop it, or wait for it to finish) and to interact with Mesos
+ * (e.g., send status updates, send framework messages, etc.).
+ */
+public interface ExecutorDriver {
+ /**
+ * Starts the executor driver. This needs to be called before any
+ * other driver calls are made.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
+ public Status start();
+
+ /**
+ * Stops the executor driver.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
+ public Status stop();
+
+ /**
+ * Aborts the driver so that no more callbacks can be made to the
+ * executor. The semantics of abort and stop have deliberately been
+ * separated so that code can detect an aborted driver (i.e., via
+ * the return status of {@link ExecutorDriver#join}, see below),
+ * and instantiate and start another driver if desired (from within
+ * the same process ... although this functionality is currently not
+ * supported for executors).
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
+ public Status abort();
+
+ /**
+ * Waits for the driver to be stopped or aborted, possibly
+ * _blocking_ the current thread indefinitely. The return status of
+ * this function can be used to determine if the driver was aborted
+ * (see mesos.proto for a description of Status).
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
+ public Status join();
+
+ /**
+ * Starts and immediately joins (i.e., blocks on) the driver.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
+ public Status run();
+
+ /**
+ * Sends a status update to the framework scheduler, retrying as
+ * necessary until an acknowledgement has been received or the
+ * executor is terminated (in which case, a TASK_LOST status update
+ * will be sent). See {@link Scheduler#statusUpdate} for more
+ * information about status update acknowledgements.
+ *
+ * @param status The status update to send.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
+ public Status sendStatusUpdate(TaskStatus status);
+
+ /**
+ * Sends a message to the framework scheduler. These messages are
+ * best effort; do not expect a framework message to be
+ * retransmitted in any reliable fashion.
+ *
+ * @param data The message payload.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see Status
+ */
+ public Status sendFrameworkMessage(byte[] data);
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/Log.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/Log.java b/myriad-commons/src/main/java/org/apache/mesos/Log.java
new file mode 100644
index 0000000..6603263
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/Log.java
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import java.util.List;
+import java.util.Set;
+
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides access to a distributed append only log. The log can be
+ * read from using a {@link Log.Reader} and written to using a
+ * {@link Log.Writer}.
+ *
+ * <p>Both the <i>Reader</i> and <i>Writer</i> will require a <i>quorum</i>
+ * which defines the <i>ratio of active Mesos Masters</i> that need to be
+ * available for a successful read or write. The <i>quorum</i> will be satisfied
+ * when the number of <i>active Masters</i> is greater than the given
+ * <i>number</i>:
+ * <pre>{@code
+ * Quorum > (Number of Masters)/2
+ * }</pre>
+ *
+ * <p>If a <i>read</i> or <i>write</i> is executed the operation will wait
+ * until their is <i>quorum</i> to succeed.
+ */
+public class Log {
+ static {
+ MesosNativeLibrary.load();
+ }
+
+ /**
+ * An opaque identifier of a log entry's position within the
+ * log. Can be used to inidicate {@link Log.Reader#read read} ranges and
+ * {@link Log.Writer#truncate truncation} locations.
+ */
+ public static class Position implements Comparable<Position> {
+ @Override
+ public int compareTo(Position that) {
+ return Long.signum(value - that.value);
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ return that instanceof Position && value == ((Position) that).value;
+ }
+
+ @Override
+ public String toString() {
+ return "Position " + value;
+ }
+
+ /**
+ * Returns an "identity" of this position, useful for serializing
+ * to logs or across communication mediums.
+ *
+ * @return The identity in bytes.
+ */
+ public byte[] identity() {
+ byte[] bytes = new byte[8];
+ bytes[0] = (byte) (0xff & (value >> 56));
+ bytes[1] = (byte) (0xff & (value >> 48));
+ bytes[2] = (byte) (0xff & (value >> 40));
+ bytes[3] = (byte) (0xff & (value >> 32));
+ bytes[4] = (byte) (0xff & (value >> 24));
+ bytes[5] = (byte) (0xff & (value >> 16));
+ bytes[6] = (byte) (0xff & (value >> 8));
+ bytes[7] = (byte) (0xff & value);
+ return bytes;
+ }
+
+ /**
+ * Creates a position identified by an integral {@code value}.
+ * <p>
+ * Positions are typically only created by the log implementation. Log
+ * users should only ever need to call this constructor in unit tests.
+ *
+ * @param value The marker for this position in the log.
+ */
+ public Position(long value) {
+ this.value = value;
+ }
+
+ private final long value;
+ }
+
+ /**
+ * Represents an opaque data entry in the {@link Log} with a
+ * {@link Log.Position}.
+ */
+ public static class Entry {
+ /**
+ * The position of this entry.
+ * @see Position
+ */
+ public final Position position;
+ /** The data at the given position.*/
+ public final byte[] data;
+
+ /**
+ * Creates a log entry.
+ * <p>
+ * Entries are typically only created by the log implementation. Log
+ * users should only ever need to call this constructor in unit tests.
+ *
+ * @param position The unique position of this entry within the log.
+ * @param data The content stored in this entry.
+ */
+ public Entry(Position position, byte[] data) {
+ this.position = position;
+ this.data = data;
+ }
+ }
+
+ /**
+ * An exception that gets thrown when an error occurs while
+ * performing a read or write operation.
+ */
+ public static class OperationFailedException extends Exception {
+ /**
+ * @param message The message for this exception.
+ */
+ public OperationFailedException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param message The message for this exception.
+ * @param cause The underlying reason this exception was generated.
+ */
+ public OperationFailedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * An exception that gets thrown when a writer no longer has the
+ * ability to perform operations (e.g., because it was superseded by
+ * another writer).
+ */
+ public static class WriterFailedException extends Exception {
+ /**
+ * @param message The message for this exception.
+ */
+ public WriterFailedException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param message The message for this exception.
+ * @param cause The underlying reason this exception was generated.
+ */
+ public WriterFailedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * Provides read access to the {@link Log}. This class is safe for
+ * use from multiple threads and for the life of the log regardless
+ * of any exceptions thrown from its methods.
+ */
+ public static class Reader {
+ /**
+ * Returns an instance of a reader that will access the given instance of
+ * the Log.
+ * @param log The log that this reader will access.
+ */
+ public Reader(Log log) {
+ this.log = log;
+ initialize(log);
+ }
+
+ /**
+ * Attempts to read from the log between the specified positions
+ * (inclusive). If either of the positions are invalid, an
+ * {@link OperationFailedException} will get thrown. Unfortunately, this
+ * will also get thrown in other circumstances (e.g., disk
+ * failure) and therefore it is currently impossible to tell these
+ * two cases apart.
+ *
+ * @param from Where to start reading.
+ * @param to Where to finish reading.
+ * @param timeout Max number of time units to wait before a
+ * {@link TimeoutException}.
+ * @param unit Type of units used for the timeout, e.g. seconds,
+ * minutes, etc.
+ *
+ * @return The list of entries fetched from the Log.
+ *
+ * @throws TimeoutException If the read doesn't happen before the
+ * timeout.
+ * @throws OperationFailedException If the read fails due that the read no
+ * longer has the ability to perform its
+ * operations.
+ * @see Position
+ * @see TimeUnit
+ */
+ public native List<Entry> read(Position from,
+ Position to,
+ long timeout,
+ TimeUnit unit)
+ throws TimeoutException, OperationFailedException;
+
+ /**
+ * Returns the beginning position of the log (might be out of date
+ * with respect to another replica).
+ *
+ * @return The beginning position of the log.
+ */
+ public native Position beginning();
+
+ /**
+ * Returns the ending position of the log (might be out of date
+ * with respect to another replica).
+ *
+ * @return The ending position of the log
+ */
+ public native Position ending();
+
+ /**
+ * Attempts to catch-up positions from the log for reading.
+ *
+ * @param timeout Max number of time units to wait before a
+ * {@link TimeoutException}.
+ * @param unit Type of time units used for the timeout, e.g. seconds,
+ * minutes, etc.
+ *
+ * @return The ending position of the caught-up range.
+ *
+ * @throws TimeoutException If the catch-up doesn't happen before
+ * the timeout.
+ * @throws OperationFailedException If the catch-up fails.
+ */
+ public native Position catchup(long timeout, TimeUnit unit)
+ throws TimeoutException, OperationFailedException;
+
+ protected native void initialize(Log log);
+
+ protected native void finalize();
+
+ private Log log; // Keeps the log from getting garbage collected.
+ private long __log;
+ private long __reader;
+ }
+
+ /**
+ * Provides write access to the {@link Log}. This class is not safe
+ * for use from multiple threads and instances should be thrown out
+ * after any {@link WriterFailedException} is thrown.
+ */
+ public static class Writer {
+ /**
+ * Constructs a writer linked the given {@link Log}.
+ *
+ * @param log The log that this writer will access.
+ * @param timeout Max number of time units to wait before a
+ * {@link TimeoutException}.
+ * @param unit Type of time units used for the timeout, e.g. seconds,
+ * minutes, etc.
+ * @param retries Number of retries
+ *
+ * @see TimeUnit
+ */
+ public Writer(Log log, long timeout, TimeUnit unit, int retries) {
+ this.log = log;
+ initialize(log, timeout, unit, retries);
+ }
+
+ /**
+ * Attempts to append to the log with the specified data returning
+ * the new end position of the log if successful.
+ *
+ * @param data Data to append to the log.
+ * @param timeout Max number of time units to wait before a
+ * {@link TimeoutException}.
+ * @param unit Type of time units used for the timeout, e.g. seconds,
+ * minutes, etc.
+ *
+ * @return The new end-position.
+ *
+ * @throws TimeoutException If the append doesn't happen before the
+ * timeout.
+ * @throws WriterFailedException If the append fails due that the writer
+ * no longer has the ability to perform its
+ * operations (e.g., because it was
+ * superseded by another writer).
+ * @see TimeUnit
+ * @see WriterFailedException
+ */
+ public native Position append(byte[] data, long timeout, TimeUnit unit)
+ throws TimeoutException, WriterFailedException;
+
+ /**
+ * Attempts to truncate the log (from the beginning to the
+ * specified position exclusive) If the position is invalid, an
+ * {@link WriterFailedException} will get thrown. Unfortunately, this will
+ * also get thrown in other circumstances (e.g., disk failure) and
+ * therefore it is currently impossible to tell these two cases
+ * apart.
+ *
+
+ * @param to The log will be truncated up to this point.
+ * @param timeout Max number of time units to wait before a
+ * {@link TimeoutException}.
+ * @param unit Type of time units used for the timeout, e.g. seconds,
+ * minutes, etc.
+ *
+ * @return The position after the truncation.
+ *
+ * @throws TimeoutException If the truncation doesn't happen before
+ * the timeout.
+ * @throws WriterFailedException If the truncation fails due an invalid
+ * position or if the writer no longer has
+ * the ability to perform its operations
+ * (e.g., because it was superseded by
+ * another writer).
+ */
+ // TODO(benh): Throw both OperationFailedException and WriterFailedException
+ // to differentiate the need for a new writer from a bad
+ // position, or a bad disk, etc.
+ public native Position truncate(Position to, long timeout, TimeUnit unit)
+ throws TimeoutException, WriterFailedException;
+
+ protected native void initialize(Log log,
+ long timeout,
+ TimeUnit unit,
+ int retries);
+
+ protected native void finalize();
+
+ private Log log; // Keeps the log from getting garbage collected.
+ private long __log;
+ private long __writer;
+ }
+
+ /**
+ * Creates a new replicated log that assumes the specified quorum
+ * size, is backed by a file at the specified path, and coordiantes
+ * with other replicas via the set of process PIDs.
+ *
+ * @param quorum The quorum size.
+ * @param path Path to the file backing this log.
+ * @param pids PIDs of the replicas to coordinate with.
+ */
+ public Log(int quorum,
+ String path,
+ Set<String> pids) {
+ initialize(quorum, path, pids);
+ }
+
+ /**
+ * Creates a new replicated log that assumes the specified quorum
+ * size, is backed by a file at the specified path, and coordiantes
+ * with other replicas associated with the specified ZooKeeper
+ * servers, timeout, and znode (or Zookeeper name space).
+ *
+ * @param quorum The quorum size.
+ * @param path Path to the file backing this log.
+ * @param servers List of ZooKeeper servers (e.g., 'ip1:port1,ip2:port2').
+ * @param timeout Max number of time units to wait before a
+ * {@link TimeoutException}.
+ * @param unit Type of time units used for the timeout, e.g. seconds,
+ * minutes, etc.
+ * @param znode Path to znode where "state" should be rooted.
+ */
+ public Log(int quorum,
+ String path,
+ String servers,
+ long timeout,
+ TimeUnit unit,
+ String znode) {
+ initialize(quorum, path, servers, timeout, unit, znode);
+ }
+
+ /**
+ * Creates a new replicated log that assumes the specified quorum
+ * size, is backed by a file at the specified path, and coordiantes
+ * with other replicas associated with the specified ZooKeeper
+ * servers, timeout, and znode (or Zookeeper name space).
+ *
+ * @param quorum The quorum size.
+ * @param path Path to the file backing this log.
+ * @param servers Zookeper servers/connection string.
+ * @param timeout Max number of time units to wait before a
+ * {@link TimeoutException}.
+ * @param unit Type of time units used for the timeout, e.g. seconds,
+ * minutes, etc.
+ * @param znode The Zookeeper name space.
+ * @param scheme Authentication scheme (e.g., "digest").
+ * @param credentials Authentication credentials (e.g., "user:pass").
+ */
+ public Log(int quorum,
+ String path,
+ String servers,
+ long timeout,
+ TimeUnit unit,
+ String znode,
+ String scheme,
+ byte[] credentials) {
+ initialize(quorum, path, servers, timeout, unit, znode, scheme, credentials);
+ }
+
+ /**
+ * Returns a position based off of the bytes recovered from
+ * Position.identity().
+ *
+ * @param identity Identity, in bytes, of the position.
+ *
+ * @return The position.
+ */
+ public Position position(byte[] identity) {
+ long value =
+ ((long) (identity[0] & 0xff) << 56) |
+ ((long) (identity[1] & 0xff) << 48) |
+ ((long) (identity[2] & 0xff) << 40) |
+ ((long) (identity[3] & 0xff) << 32) |
+ ((long) (identity[4] & 0xff) << 24) |
+ ((long) (identity[5] & 0xff) << 16) |
+ ((long) (identity[6] & 0xff) << 8) |
+ ((long) (identity[7] & 0xff));
+ return new Position(value);
+ }
+
+ protected native void initialize(int quorum,
+ String path,
+ Set<String> pids);
+
+ protected native void initialize(int quorum,
+ String path,
+ String servers,
+ long timeout,
+ TimeUnit unit,
+ String znode);
+
+ protected native void initialize(int quorum,
+ String path,
+ String servers,
+ long timeout,
+ TimeUnit unit,
+ String znode,
+ String scheme,
+ byte[] credentials);
+
+ protected native void finalize();
+
+ private long __log;
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/MesosExecutorDriver.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/MesosExecutorDriver.java b/myriad-commons/src/main/java/org/apache/mesos/MesosExecutorDriver.java
new file mode 100644
index 0000000..b52a81b
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/MesosExecutorDriver.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos;
+
+import org.apache.mesos.Protos.*;
+
+/**
+ * Concrete implementation of an ExecutorDriver that connects an
+ * Executor with a Mesos slave. The MesosExecutorDriver is
+ * thread-safe.
+ * <p>
+ * The driver is responsible for invoking the Executor callbacks as it
+ * communicates with the Mesos slave.
+ * </p>
+ * <p>
+ * Note that blocking on the MesosExecutorDriver (e.g., via {@link
+ * #join}) doesn't affect the executor callbacks in anyway because
+ * they are handled by a different thread.
+ * </p>
+ * <p>
+ * Note that the driver uses GLOG to do its own logging. GLOG flags can
+ * be set via environment variables, prefixing the flag name with
+ * "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
+ * src/logging/flags.hpp. Mesos flags can also be set via environment
+ * variables, prefixing the flag name with "MESOS_", e.g.,
+ * "MESOS_QUIET=1".
+ * </p>
+ * <p>
+ * See src/examples/java/TestExecutor.java for an example of using the
+ * MesosExecutorDriver.
+ * </p>
+ */
+public class MesosExecutorDriver implements ExecutorDriver {
+ static {
+ MesosNativeLibrary.load();
+ }
+
+ /**
+ * Creates a new driver that uses the specified Executor.
+ *
+ * @param executor The instance of the executor that will be used
+ * to connect to the slave.
+ *
+ * @see Executor
+ */
+ public MesosExecutorDriver(Executor executor) {
+ if (executor == null) {
+ throw new NullPointerException("Not expecting a null Executor");
+ }
+
+ this.executor = executor;
+
+ initialize();
+ }
+
+ /**
+ * See ExecutorDriver for descriptions of these.
+ *
+ * @see ExecutorDriver
+ */
+ public native Status start();
+ public native Status stop();
+ public native Status abort();
+ public native Status join();
+
+ public Status run() {
+ Status status = start();
+ return status != Status.DRIVER_RUNNING ? status : join();
+ }
+
+ public native Status sendStatusUpdate(TaskStatus status);
+ public native Status sendFrameworkMessage(byte[] data);
+
+ protected native void initialize();
+ protected native void finalize();
+
+ private final Executor executor;
+
+ private long __executor;
+ private long __driver;
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/MesosNativeLibrary.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/MesosNativeLibrary.java b/myriad-commons/src/main/java/org/apache/mesos/MesosNativeLibrary.java
new file mode 100644
index 0000000..4ce325c
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/MesosNativeLibrary.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos;
+
+public class MesosNativeLibrary {
+ /**
+ * Represent a 'libmesos' version with Major, Minor, and Patch versions. We
+ * use a class here to make it easier to do version compatibility checking.
+ * For example:
+ * <pre>
+ * {@code
+ * static Version BugFixVersion = new Version(0, 22, 1);
+ * public static void myFunction() {
+ * if (version().compareTo(BugFixVersion) >= 0) {
+ * // New behavior with bug fix.
+ * } else {
+ * // Old behavior for backwards compatibility.
+ * }
+ * }
+ * }
+ * </pre>
+ */
+ public static class Version implements Comparable<Version> {
+ public Version(long major, long minor, long patch) {
+ if (major < 0) {
+ throw new IllegalArgumentException(
+ "Major version must not be negative");
+ }
+
+ if (minor < 0) {
+ throw new IllegalArgumentException(
+ "Minor version must not be negative");
+ }
+
+ if (patch < 0) {
+ throw new IllegalArgumentException(
+ "Patch version must not be negative");
+ }
+
+ this.major = major;
+ this.minor = minor;
+ this.patch = patch;
+ }
+
+ public Version(long major, long minor) {
+ this(major, minor, 0);
+ }
+
+ public Version(long major) {
+ this(major, 0, 0);
+ }
+
+ public boolean equals(Version other) {
+ return other != null &&
+ major == other.major &&
+ minor == other.minor &&
+ patch == other.patch;
+ }
+
+ /**
+ * Compare this version to an 'other' one. The comparison is done
+ * lexicographically. This returns -1 if this version is 'lesser' than the
+ * other, 0 if they are equivalent, and 1 if this version is 'greater'.
+ */
+ @Override
+ public int compareTo(Version other) {
+ if (other == null) {
+ throw new IllegalArgumentException("other Version must not be null");
+ }
+
+ if (major < other.major) {
+ return -1;
+ } else if (major > other.major) {
+ return 1;
+ }
+
+ if (minor < other.minor) {
+ return -1;
+ } else if (minor > other.minor) {
+ return 1;
+ }
+
+ if (patch < other.patch) {
+ return -1;
+ } else if (patch > other.patch) {
+ return 1;
+ }
+
+ return 0;
+ }
+
+ /**
+ * A helper that is easier to use than 'compareTo', this returns
+ * true if 'this' version is strictly 'less than', not 'less than
+ * or equal to' the 'other' version.
+ */
+ public boolean before(Version other) {
+ return this.compareTo(other) < 0;
+ }
+
+ /**
+ * A helper that is easier to use than 'compareTo', this returns
+ * true if 'this' version is strictly 'greater than', not 'greater
+ * than or equal to' the 'other' version.
+ */
+ public boolean after(Version other) {
+ return this.compareTo(other) > 0;
+ }
+
+ public final long major;
+ public final long minor;
+ public final long patch;
+ }
+
+ /**
+ * Attempts to load the native library (if it was not previously loaded)
+ * from the given path. If the path is null 'java.library.path' is used to
+ * load the library.
+ */
+ public static synchronized void load(String path) {
+ // Our JNI library will actually set 'loaded' to true once it is
+ // loaded, that way the library can get loaded by a user via
+ // 'System.load' in the event that they want to specify an
+ // absolute path and we won't try and reload the library ourselves
+ // (which would probably fail because 'java.library.path' might
+ // not be set).
+ if (loaded) {
+ return;
+ }
+
+ // In some circumstances, such as when sandboxed class loaders are used,
+ // the current thread's context class loader will not be able to see
+ // MesosNativeLibrary (even when executing this code!).
+ // We therefore, temporarily swap the thread's context class loader with
+ // the class loader that loaded this class, for the duration of the native
+ // library load.
+ ClassLoader contextClassLoader =
+ Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(
+ MesosNativeLibrary.class.getClassLoader());
+ try {
+ if (path != null) {
+ System.load(path);
+ } else {
+ // TODO(tillt): Change the default fallback to JNI specific library
+ // once libmesos has been split.
+ System.loadLibrary("mesos");
+ }
+ } catch (UnsatisfiedLinkError error) {
+ System.err.println("Failed to load native Mesos library from " +
+ (path != null ? path : System.getProperty("java.library.path")));
+ throw error;
+ } finally {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+
+ public static void load() {
+ // Try to get the JNI specific library path from the environment.
+ String path = System.getenv("MESOS_NATIVE_JAVA_LIBRARY");
+
+ // As a fallback, use deprecated environment variable to extract that path.
+ if (path == null) {
+ path = System.getenv("MESOS_NATIVE_LIBRARY");
+ if (path != null) {
+ System.out.println("Warning: MESOS_NATIVE_LIBRARY is deprecated, " +
+ "use MESOS_NATIVE_JAVA_LIBRARY instead. Future releases will " +
+ "not support JNI bindings via MESOS_NATIVE_LIBRARY.");
+ }
+ }
+
+ load(path);
+ }
+
+ /**
+ * Returns the version of the native loaded library, or throws a
+ * runtime exception if the library is not loaded. This was
+ * introduced in MESOS 0.22.1. Any version prior to that will be
+ * 0.0.0. This means you should not make version specific decision
+ * before the 0.22.1 version boundary. For example, if you found a
+ * bug that was fixed in 0.19.0, you will *not* be able to perform
+ * the following check correctly:
+ *
+ * if (version().before(new Version(0, 19, 0))) {
+ * ...
+ * }
+ *
+ * This predicate will return true for all versions up until 0.22.1.
+ */
+ public static synchronized Version version() {
+ // Since we allow 'load' to be called with a parameter, we can not load on
+ // behalf of the user here. Instead, we throw an exception if the library
+ // has not been loaded.
+ if (!loaded) {
+ throw new RuntimeException("'libmesos' not loaded");
+ }
+
+ if (version == null) {
+ // Try to load the libmesos version identifier. If we get an
+ // 'UnsatisfiedLinkError' then this means we are loading a 'libmesos' with
+ // a version prior to 0.22.1, which is when the 'MAJOR', 'MINOR', and
+ // 'PATCH' version identifiers were introduced.
+ try {
+ version = _version();
+ } catch (UnsatisfiedLinkError error) {
+ System.err.println(
+ "WARNING: using an old version of 'libmesos'" +
+ " without proper version information: " + error.getMessage());
+
+ // If we're using a version of 'libmesos' less than 0.22.1, then we set
+ // the version to 0.0.0.
+ version = new Version(0, 0, 0);
+ }
+ }
+
+ return version;
+ }
+
+ public static final String VERSION = "1.5.0";
+
+ private static Version version = null;
+
+ private static boolean loaded = false;
+
+ /**
+ * Native implementation of 'libmesos' version identifier function.
+ */
+ private static native Version _version();
+}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/MesosSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/MesosSchedulerDriver.java b/myriad-commons/src/main/java/org/apache/mesos/MesosSchedulerDriver.java
new file mode 100644
index 0000000..4f61da2
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/MesosSchedulerDriver.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos;
+
+import java.util.Collection;
+
+import org.apache.mesos.Protos.*;
+
+/**
+ * Concrete implementation of a SchedulerDriver that connects a
+ * Scheduler with a Mesos master. The MesosSchedulerDriver is
+ * thread-safe.
+ * <p>
+ * Note that scheduler failover is supported in Mesos. After a
+ * scheduler is registered with Mesos it may failover (to a new
+ * process on the same machine or across multiple machines) by
+ * creating a new driver with the ID given to it in {@link
+ * Scheduler#registered}.
+ * <p>
+ * The driver is responsible for invoking the Scheduler callbacks as
+ * it communicates with the Mesos master.
+ * <p>
+ * Note that blocking on the MesosSchedulerDriver (e.g., via {@link
+ * #join}) doesn't affect the scheduler callbacks in anyway because
+ * they are handled by a different thread.
+ * <p>
+ * <p>
+ * Note that the driver uses GLOG to do its own logging. GLOG flags can
+ * be set via environment variables, prefixing the flag name with
+ * "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
+ * src/logging/flags.hpp. Mesos flags can also be set via environment
+ * variables, prefixing the flag name with "MESOS_", e.g.,
+ * "MESOS_QUIET=1".
+ * <p>
+ * See src/examples/java/TestFramework.java for an example of using
+ * the MesosSchedulerDriver.
+ */
+public class MesosSchedulerDriver implements SchedulerDriver {
+ static {
+ MesosNativeLibrary.load();
+ }
+
+ /**
+ * Creates a new driver for the specified scheduler. The master
+ * should be one of:
+ * <pre>
+ * {@code
+ * host:port
+ * zk://host1:port1,host2:port2,.../path
+ * zk://username:password@host1:port1,host2:port2,.../path
+ * file:///path/to/file (where file contains one of the above)
+ * }
+ * </pre>
+ * <p>
+ * The driver will attempt to "failover" if the specified
+ * FrameworkInfo includes a valid FrameworkID.
+ * <p>
+ * Any Mesos configuration options are read from environment
+ * variables, as well as any configuration files found through the
+ * environment variables.
+ * <p>
+ *
+ * @param scheduler The scheduler implementation which callbacks are invoked
+ * upon scheduler events.
+ * @param framework The frameworkInfo describing the current framework.
+ * @param master The address to the currently active Mesos master.
+ */
+ // TODO(vinod): Deprecate this in favor the constructor that takes
+ // 'credential' as parameter.
+ public MesosSchedulerDriver(Scheduler scheduler,
+ FrameworkInfo framework,
+ String master) {
+ if (scheduler == null) {
+ throw new NullPointerException("Not expecting a null Scheduler");
+ }
+
+ if (framework == null) {
+ throw new NullPointerException("Not expecting a null FrameworkInfo");
+ }
+
+ if (master == null) {
+ throw new NullPointerException("Not expecting a null master");
+ }
+
+ this.scheduler = scheduler;
+ this.framework = framework;
+ this.master = master;
+ this.implicitAcknowledgements = true;
+ this.credential = null;
+
+ initialize();
+ }
+
+ /**
+ * Same as the other constructors, except that it accepts the newly
+ * introduced 'credential' parameter.
+ *
+ * @param scheduler The scheduler implementation which callbacks are invoked
+ * upon scheduler events.
+ * @param framework The frameworkInfo describing the current framework.
+ * @param master The address to the currently active Mesos master.
+ * @param credential The credentials that will be used used to authenticate
+ * calls from this scheduler.
+ */
+ public MesosSchedulerDriver(Scheduler scheduler,
+ FrameworkInfo framework,
+ String master,
+ Credential credential) {
+
+ if (scheduler == null) {
+ throw new NullPointerException("Not expecting a null Scheduler");
+ }
+
+ if (framework == null) {
+ throw new NullPointerException("Not expecting a null FrameworkInfo");
+ }
+
+ if (master == null) {
+ throw new NullPointerException("Not expecting a null master");
+ }
+
+ if (credential == null) {
+ throw new NullPointerException("Not expecting a null credential");
+ }
+
+ this.scheduler = scheduler;
+ this.framework = framework;
+ this.master = master;
+ this.implicitAcknowledgements = true;
+ this.credential = credential;
+
+ initialize();
+ }
+
+ /**
+ * Same as the other constructors, except that it accepts the newly
+ * introduced 'implicitAcknowledgements' parameter.
+ *
+ * @param scheduler The scheduler implementation which callbacks are invoked
+ * upon scheduler events.
+ * @param framework The frameworkInfo describing the current framework.
+ * @param master The address to the currently active Mesos master.
+ * @param implicitAcknowledgements Whether the driver should send
+ * acknowledgements on behalf of the scheduler. Setting this to
+ * false allows schedulers to perform their own acknowledgements,
+ * which enables asynchronous / batch processing of status updates.
+ */
+ public MesosSchedulerDriver(Scheduler scheduler,
+ FrameworkInfo framework,
+ String master,
+ boolean implicitAcknowledgements) {
+
+ if (scheduler == null) {
+ throw new NullPointerException("Not expecting a null Scheduler");
+ }
+
+ if (framework == null) {
+ throw new NullPointerException("Not expecting a null FrameworkInfo");
+ }
+
+ if (master == null) {
+ throw new NullPointerException("Not expecting a null master");
+ }
+
+ this.scheduler = scheduler;
+ this.framework = framework;
+ this.master = master;
+ this.implicitAcknowledgements = implicitAcknowledgements;
+ this.credential = null;
+
+ initialize();
+ }
+
+ /**
+ * Same as the other constructors, except that it accepts the newly
+ * introduced 'implicitAcknowledgements' and 'credentials' parameters.
+ *
+ * @param scheduler The scheduler implementation which callbacks are invoked
+ * upon scheduler events.
+ * @param framework The frameworkInfo describing the current framework.
+ * @param master The address to the currently active Mesos master.
+ * @param implicitAcknowledgements Whether the driver should send
+ * acknowledgements on behalf of the scheduler. Setting this to
+ * false allows schedulers to perform their own acknowledgements,
+ * which enables asynchronous / batch processing of status updates.
+ * @param credential The credentials that will be used used to authenticate
+ * calls from this scheduler.
+ */
+ public MesosSchedulerDriver(Scheduler scheduler,
+ FrameworkInfo framework,
+ String master,
+ boolean implicitAcknowledgements,
+ Credential credential) {
+
+ if (scheduler == null) {
+ throw new NullPointerException("Not expecting a null Scheduler");
+ }
+
+ if (framework == null) {
+ throw new NullPointerException("Not expecting a null FrameworkInfo");
+ }
+
+ if (master == null) {
+ throw new NullPointerException("Not expecting a null master");
+ }
+
+ if (credential == null) {
+ throw new NullPointerException("Not expecting a null credential");
+ }
+
+ this.scheduler = scheduler;
+ this.framework = framework;
+ this.master = master;
+ this.implicitAcknowledgements = implicitAcknowledgements;
+ this.credential = credential;
+
+ initialize();
+ }
+
+ public native Status start();
+
+ public native Status stop(boolean failover);
+
+ public Status stop() {
+ return stop(false);
+ }
+
+ public native Status abort();
+
+ public native Status join();
+
+ public Status run() {
+ Status status = start();
+ return status != Status.DRIVER_RUNNING ? status : join();
+ }
+
+ public native Status requestResources(Collection<Request> requests);
+
+ public Status launchTasks(OfferID offerId,
+ Collection<TaskInfo> tasks) {
+ return launchTasks(offerId, tasks, Filters.newBuilder().build());
+ }
+
+ public native Status launchTasks(OfferID offerId,
+ Collection<TaskInfo> tasks,
+ Filters filters);
+ public Status launchTasks(Collection<OfferID> offerIds,
+ Collection<TaskInfo> tasks) {
+ return launchTasks(offerIds, tasks, Filters.newBuilder().build());
+ }
+
+ public native Status launchTasks(Collection<OfferID> offerIds,
+ Collection<TaskInfo> tasks,
+ Filters filters);
+
+ public native Status killTask(TaskID taskId);
+
+ public native Status acceptOffers(Collection<OfferID> offerIds,
+ Collection<Offer.Operation> operations,
+ Filters filters);
+
+ public Status declineOffer(OfferID offerId) {
+ return declineOffer(offerId, Filters.newBuilder().build());
+ }
+
+ public native Status declineOffer(OfferID offerId, Filters filters);
+
+ public native Status reviveOffers();
+
+ public native Status suppressOffers();
+
+ public native Status acknowledgeStatusUpdate(TaskStatus status);
+
+ public native Status sendFrameworkMessage(ExecutorID executorId,
+ SlaveID slaveId,
+ byte[] data);
+
+ public native Status reconcileTasks(Collection<TaskStatus> statuses);
+
+ protected native void initialize();
+ protected native void finalize();
+
+ private final Scheduler scheduler;
+ private final FrameworkInfo framework;
+ private final String master;
+ private final boolean implicitAcknowledgements;
+ private final Credential credential;
+
+ private long __scheduler;
+ private long __driver;
+}