You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/21 23:26:52 UTC

[2/8] mesos git commit: Updated Java bindings for explicit acknowledgements.

Updated Java bindings for explicit acknowledgements.

Review: https://reviews.apache.org/r/30972


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28d5f932
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28d5f932
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28d5f932

Branch: refs/heads/master
Commit: 28d5f9326ef2b7c6f2d2b5ece351d4ca09608100
Parents: ff4397a
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Feb 12 22:26:30 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Sat Feb 21 14:26:32 2015 -0800

----------------------------------------------------------------------
 .../org_apache_mesos_MesosSchedulerDriver.cpp   | 52 ++++++++++-
 .../org/apache/mesos/MesosSchedulerDriver.java  | 94 +++++++++++++++++++-
 src/java/src/org/apache/mesos/Scheduler.java    | 11 ++-
 .../src/org/apache/mesos/SchedulerDriver.java   | 15 ++++
 4 files changed, 162 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/28d5f932/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
index 3498930..4f0dad7 100644
--- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
+++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
@@ -283,8 +283,8 @@ void JNIScheduler::statusUpdate(SchedulerDriver* driver,
   // scheduler.statusUpdate(driver, status);
   jmethodID statusUpdate =
     env->GetMethodID(clazz, "statusUpdate",
-		     "(Lorg/apache/mesos/SchedulerDriver;"
-		     "Lorg/apache/mesos/Protos$TaskStatus;)V");
+                     "(Lorg/apache/mesos/SchedulerDriver;"
+                     "Lorg/apache/mesos/Protos$TaskStatus;)V");
 
   jobject jstatus = convert<TaskStatus>(env, status);
 
@@ -498,13 +498,31 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_MesosSchedulerDriver_initialize
   jfieldID master = env->GetFieldID(clazz, "master", "Ljava/lang/String;");
   jobject jmaster = env->GetObjectField(thiz, master);
 
+  // NOTE: Older versions (< 0.22.0) of MesosSchedulerDriver.java
+  // do not have the 'implicitAcknowledgements' field, so when None()
+  // is returned we default to the old behavior: implicit
+  // acknowledgements.
+  Result<jfieldID> implicitAcknowledgements =
+    getFieldID(env, clazz, "implicitAcknowledgements", "Z");
+
+  if (implicitAcknowledgements.isError()) {
+    return; // Exception has been thrown.
+  }
+
+  // Default to implicit acknowledgements, as done before 0.22.0.
+  jboolean jimplicitAcknowledgements = JNI_TRUE;
+  if (implicitAcknowledgements.isSome()) {
+    jimplicitAcknowledgements =
+      env->GetBooleanField(thiz, implicitAcknowledgements.get());
+  }
+
   // Get out the Credential passed into the constructor.
   // NOTE: Older versions (< 0.15.0) of MesosSchedulerDriver do not set
   // 'credential' field. To be backwards compatible we should safely
   // handle this case.
   Result<jfieldID> credential = getFieldID(env, clazz, "credential", "Lorg/apache/mesos/Protos$Credential;");
   if (credential.isError()) {
-    return;
+    return; // Exception has been thrown.
   }
 
   jobject jcredential = NULL;
@@ -520,12 +538,14 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_MesosSchedulerDriver_initialize
         scheduler,
         construct<FrameworkInfo>(env, jframework),
         construct<string>(env, jmaster),
+        construct(env, jimplicitAcknowledgements),
         construct<Credential>(env, jcredential));
   } else {
     driver = new MesosSchedulerDriver(
        scheduler,
        construct<FrameworkInfo>(env, jframework),
-       construct<string>(env, jmaster));
+       construct<string>(env, jmaster),
+       construct(env, jimplicitAcknowledgements));
   }
 
   // Initialize the __driver variable
@@ -648,6 +668,30 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_join
 
 /*
  * Class:     org_apache_mesos_MesosSchedulerDriver
+ * Method:    acknowledgeStatusUpdate
+ * Signature: (Lorg/apache/mesos/Protos/TaskStatus;)Lorg/apache/mesos/Protos/Status;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_acknowledgeStatusUpdate
+  (JNIEnv* env, jobject thiz, jobject jtaskStatus)
+{
+  // Construct a C++ TaskID from the Java TaskId.
+  const TaskStatus& taskStatus = construct<TaskStatus>(env, jtaskStatus);
+
+  // Now invoke the underlying driver.
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
+  MesosSchedulerDriver* driver =
+    (MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
+
+  Status status = driver->acknowledgeStatusUpdate(taskStatus);
+
+  return convert<Status>(env, status);
+}
+
+
+/*
+ * Class:     org_apache_mesos_MesosSchedulerDriver
  * Method:    sendFrameworkMessage
  * Signature: (Lorg/apache/mesos/Protos/ExecutorID;Lorg/apache/mesos/Protos/SlaveID;[B)Lorg/apache/mesos/Protos/Status;
  */

http://git-wip-us.apache.org/repos/asf/mesos/blob/28d5f932/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
index 6ad03ce..a1055a5 100644
--- a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
@@ -106,14 +106,15 @@ public class MesosSchedulerDriver implements SchedulerDriver {
     this.scheduler = scheduler;
     this.framework = framework;
     this.master = master;
+    this.implicitAcknowledgements = true;
     this.credential = null;
 
     initialize();
   }
 
   /**
-   * Same as the above constructor, except that it accepts 'credential'
-   * as a parameter.
+   * Same as the other constructors, except that it accepts the newly
+   * introduced 'credential' parameter.
    *
    * @param scheduler   The scheduler implementation which callbacks are invoked
    *                    upon scheduler events.
@@ -146,6 +147,92 @@ public class MesosSchedulerDriver implements SchedulerDriver {
     this.scheduler = scheduler;
     this.framework = framework;
     this.master = master;
+    this.implicitAcknowledgements = true;
+    this.credential = credential;
+
+    initialize();
+  }
+
+  /**
+   * Same as the other constructors, except that it accepts the newly
+   * introduced 'implicitAcknowledgements' parameter.
+   *
+   * @param scheduler   The scheduler implementation which callbacks are invoked
+   *                    upon scheduler events.
+   * @param framework   The frameworkInfo describing the current framework.
+   * @param master      The address to the currently active Mesos master.
+   * @param implicitAcknowledgements  Whether the driver should send
+   *            acknowledgements on behalf of the scheduler. Setting this to
+   *            false allows schedulers to perform their own acknowledgements,
+   *            which enables asynchronous / batch processing of status updates.
+   */
+  public MesosSchedulerDriver(Scheduler scheduler,
+                              FrameworkInfo framework,
+                              String master,
+                              boolean implicitAcknowledgements) {
+
+    if (scheduler == null) {
+      throw new NullPointerException("Not expecting a null Scheduler");
+    }
+
+    if (framework == null) {
+      throw new NullPointerException("Not expecting a null FrameworkInfo");
+    }
+
+    if (master == null) {
+      throw new NullPointerException("Not expecting a null master");
+    }
+
+    this.scheduler = scheduler;
+    this.framework = framework;
+    this.master = master;
+    this.implicitAcknowledgements = implicitAcknowledgements;
+    this.credential = null;
+
+    initialize();
+  }
+
+  /**
+   * Same as the other constructors, except that it accepts the newly
+   * introduced 'implicitAcknowledgements' and 'credentials' parameters.
+   *
+   * @param scheduler   The scheduler implementation which callbacks are invoked
+   *                    upon scheduler events.
+   * @param framework   The frameworkInfo describing the current framework.
+   * @param master      The address to the currently active Mesos master.
+   * @param implicitAcknowledgements  Whether the driver should send
+   *            acknowledgements on behalf of the scheduler. Setting this to
+   *            false allows schedulers to perform their own acknowledgements,
+   *            which enables asynchronous / batch processing of status updates.
+   * @param credential  The credentials that will be used used to authenticate
+   *                    calls from this scheduler.
+   */
+  public MesosSchedulerDriver(Scheduler scheduler,
+                              FrameworkInfo framework,
+                              String master,
+                              boolean implicitAcknowledgements,
+                              Credential credential) {
+
+    if (scheduler == null) {
+      throw new NullPointerException("Not expecting a null Scheduler");
+    }
+
+    if (framework == null) {
+      throw new NullPointerException("Not expecting a null FrameworkInfo");
+    }
+
+    if (master == null) {
+      throw new NullPointerException("Not expecting a null master");
+    }
+
+    if (credential == null) {
+      throw new NullPointerException("Not expecting a null credential");
+    }
+
+    this.scheduler = scheduler;
+    this.framework = framework;
+    this.master = master;
+    this.implicitAcknowledgements = implicitAcknowledgements;
     this.credential = credential;
 
     initialize();
@@ -197,6 +284,8 @@ public class MesosSchedulerDriver implements SchedulerDriver {
 
   public native Status reviveOffers();
 
+  public native Status acknowledgeStatusUpdate(TaskStatus status);
+
   public native Status sendFrameworkMessage(ExecutorID executorId,
                                             SlaveID slaveId,
                                             byte[] data);
@@ -209,6 +298,7 @@ public class MesosSchedulerDriver implements SchedulerDriver {
   private final Scheduler scheduler;
   private final FrameworkInfo framework;
   private final String master;
+  private final boolean implicitAcknowledgements;
   private final Credential credential;
 
   private long __scheduler;

http://git-wip-us.apache.org/repos/asf/mesos/blob/28d5f932/src/java/src/org/apache/mesos/Scheduler.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/Scheduler.java b/src/java/src/org/apache/mesos/Scheduler.java
index 337e455..0e02f89 100644
--- a/src/java/src/org/apache/mesos/Scheduler.java
+++ b/src/java/src/org/apache/mesos/Scheduler.java
@@ -112,12 +112,15 @@ public interface Scheduler {
   /**
    * Invoked when the status of a task has changed (e.g., a slave is
    * lost and so the task is lost, a task finishes and an executor
-   * sends a status update saying so, etc). Note that returning from
-   * this callback _acknowledges_ receipt of this status update! If
-   * for whatever reason the scheduler aborts during this callback (or
+   * sends a status update saying so, etc). If implicit
+   * acknowledgements are being used, then returning from this
+   * callback _acknowledges_ receipt of this status update! If for
+   * whatever reason the scheduler aborts during this callback (or
    * the process exits) another status update will be delivered (note,
    * however, that this is currently not true if the slave sending the
-   * status update is lost/fails during that time).
+   * status update is lost/fails during that time). If explicit
+   * acknowledgements are in use, the scheduler must acknowledge this
+   * status on the driver.
    *
    * @param driver The driver that was used to run this scheduler.
    * @param status The status update, which includes the task ID and status.

http://git-wip-us.apache.org/repos/asf/mesos/blob/28d5f932/src/java/src/org/apache/mesos/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index e2d1f92..d5b100a 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -227,6 +227,21 @@ public interface SchedulerDriver {
   Status reviveOffers();
 
   /**
+   * Acknowledges the status update. This should only be called
+   * once the status update is processed durably by the scheduler.
+   * Not that explicit acknowledgements must be requested via the
+   * constructor argument, otherwise a call to this method will
+   * cause the driver to crash.
+   *
+   * @param status  The status to acknowledge.
+   *
+   * @return        The state of the driver after the call.
+   *
+   * @see TaskStatus
+   */
+  Status acknowledgeStatusUpdate(TaskStatus status);
+
+  /**
    * Sends a message from the framework to one of its executors. These
    * messages are best effort; do not expect a framework message to be
    * retransmitted in any reliable fashion.