You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/03/12 02:22:16 UTC

[2/3] mesos git commit: Added Java binding for the new acceptOffers API.

Added Java binding for the new acceptOffers API.

Review: https://reviews.apache.org/r/31873


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6a6cb4b5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6a6cb4b5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6a6cb4b5

Branch: refs/heads/master
Commit: 6a6cb4b58874f0f4f934cd52024a3390dea03ed2
Parents: 4f455c2
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Mar 11 18:16:30 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Mar 11 18:21:53 2015 -0700

----------------------------------------------------------------------
 src/examples/java/TestFramework.java            | 22 ++++++-
 src/java/jni/construct.cpp                      | 21 ++++++
 .../org_apache_mesos_MesosSchedulerDriver.cpp   | 67 ++++++++++++++++++++
 .../org/apache/mesos/MesosSchedulerDriver.java  |  4 ++
 .../src/org/apache/mesos/SchedulerDriver.java   | 24 +++++++
 5 files changed, 135 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/examples/java/TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/TestFramework.java b/src/examples/java/TestFramework.java
index ffc90b5..9e95369 100644
--- a/src/examples/java/TestFramework.java
+++ b/src/examples/java/TestFramework.java
@@ -64,7 +64,7 @@ public class TestFramework {
       double MEM_PER_TASK = 128;
 
       for (Offer offer : offers) {
-        List<TaskInfo> tasks = new ArrayList<TaskInfo>();
+        Offer.Operation.Launch.Builder launch = Offer.Operation.Launch.newBuilder();
         double offerCpus = 0;
         double offerMem = 0;
         for (Resource resource : offer.getResourcesList()) {
@@ -105,13 +105,29 @@ public class TestFramework {
             .setExecutor(ExecutorInfo.newBuilder(executor))
             .build();
 
-          tasks.add(task);
+          launch.addTaskInfos(TaskInfo.newBuilder(task));
 
           remainingCpus -= CPUS_PER_TASK;
           remainingMem -= MEM_PER_TASK;
         }
+
+        // NOTE: We use the new API `acceptOffers` here to launch tasks. The
+        // 'launchTasks' API will be deprecated.
+        List<OfferID> offerIds = new ArrayList<OfferID>();
+        offerIds.add(offer.getId());
+
+        List<Offer.Operation> operations = new ArrayList<Offer.Operation>();
+
+        Offer.Operation operation = Offer.Operation.newBuilder()
+          .setType(Offer.Operation.Type.LAUNCH)
+          .setLaunch(launch)
+          .build();
+
+        operations.add(operation);
+
         Filters filters = Filters.newBuilder().setRefuseSeconds(1).build();
-        driver.launchTasks(offer.getId(), tasks, filters);
+
+        driver.acceptOffers(offerIds, operations, filters);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/jni/construct.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/construct.cpp b/src/java/jni/construct.cpp
index e54c11e..1343208 100644
--- a/src/java/jni/construct.cpp
+++ b/src/java/jni/construct.cpp
@@ -380,3 +380,24 @@ Request construct(JNIEnv* env, jobject jobj)
 
   return request;
 }
+
+
+template <>
+Offer::Operation construct(JNIEnv* env, jobject jobj)
+{
+  jclass clazz = env->GetObjectClass(jobj);
+
+  // byte[] data = obj.toByteArray();
+  jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B");
+
+  jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray);
+
+  jbyte* data = env->GetByteArrayElements(jdata, NULL);
+  jsize length = env->GetArrayLength(jdata);
+
+  const Offer::Operation& operation = parse<Offer::Operation>(data, length);
+
+  env->ReleaseByteArrayElements(jdata, data, 0);
+
+  return operation;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
index 4f0dad7..a89ebed 100644
--- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
+++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
@@ -874,6 +874,73 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_launchTasks
 
 /*
  * Class:     org_apache_mesos_MesosSchedulerDriver
+ * Method:    acceptOffers
+ * Signature: (Ljava/util/Collection;Ljava/util/Collection;Lorg/apache/mesos/Protos$Filters;)Lorg/apache/mesos/Protos/Status;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_acceptOffers__Ljava_util_Collection_2Ljava_util_Collection_2Lorg_apache_mesos_Protos_00024Filters_2
+  (JNIEnv* env, jobject thiz, jobject jofferIds, jobject joperations, jobject jfilters)
+{
+  // Construct C++ OfferIDs from each Java OfferIDs.
+  vector<OfferID> offers;
+  jclass clazz = env->GetObjectClass(jofferIds);
+
+  // Iterator iterator = offerIds.iterator();
+  jmethodID iterator =
+    env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;");
+  jobject jiterator = env->CallObjectMethod(jofferIds, iterator);
+
+  clazz = env->GetObjectClass(jiterator);
+
+  // while (iterator.hasNext()) {
+  jmethodID hasNext = env->GetMethodID(clazz, "hasNext", "()Z");
+
+  jmethodID next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;");
+
+  while (env->CallBooleanMethod(jiterator, hasNext)) {
+    // Object offerId = iterator.next();
+    jobject jofferId = env->CallObjectMethod(jiterator, next);
+    offers.push_back(construct<OfferID>(env, jofferId));
+  }
+
+  // Construct C++ Offer::Operations from each Java Offer.Operations.
+  vector<Offer::Operation> operations;
+  clazz = env->GetObjectClass(joperations);
+
+  // Iterator iterator = operations.iterator();
+  iterator = env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;");
+  jiterator = env->CallObjectMethod(joperations, iterator);
+
+  clazz = env->GetObjectClass(jiterator);
+
+  // while (iterator.hasNext()) {
+  hasNext = env->GetMethodID(clazz, "hasNext", "()Z");
+
+  next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;");
+
+  while (env->CallBooleanMethod(jiterator, hasNext)) {
+    // Object operation = iterator.next();
+    jobject joperation = env->CallObjectMethod(jiterator, next);
+    operations.push_back(construct<Offer::Operation>(env, joperation));
+  }
+
+  // Construct C++ Filters from the Java Filters.
+  const Filters& filters = construct<Filters>(env, jfilters);
+
+  // Now invoke the underlying driver.
+  clazz = env->GetObjectClass(thiz);
+
+  jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
+  MesosSchedulerDriver* driver =
+    (MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
+
+  Status status = driver->acceptOffers(offers, operations, filters);
+
+  return convert<Status>(env, status);
+}
+
+
+/*
+ * Class:     org_apache_mesos_MesosSchedulerDriver
  * Method:    declineOffer
  * Signature: (Lorg/apache/mesos/Protos/OfferID;Lorg/apache/mesos/Protos/Filters;)Lorg/apache/mesos/Protos/Status;
  */

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
index a1055a5..b9b2ea8 100644
--- a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
@@ -276,6 +276,10 @@ public class MesosSchedulerDriver implements SchedulerDriver {
 
   public native Status killTask(TaskID taskId);
 
+  public native Status acceptOffers(Collection<OfferID> offerIds,
+                                    Collection<Offer.Operation> operations,
+                                    Filters filters);
+
   public Status declineOffer(OfferID offerId) {
     return declineOffer(offerId, Filters.newBuilder().build());
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/src/org/apache/mesos/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index d5b100a..183eec8 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -186,6 +186,30 @@ public interface SchedulerDriver {
   Status killTask(TaskID taskId);
 
   /**
+   * Accepts the given offers and performs a sequence of operations on
+   * those accepted offers. See Offer.Operation in mesos.proto for the
+   * set of available operations. Available resources are aggregated
+   * when multiple offers are provided. Note that all offers must
+   * belong to the same slave. Any unused resources will be considered
+   * declined. The specified filters are applied on all unused
+   * resources (see mesos.proto for a description of Filters).
+   *
+   * @param offerIds    The collection of offer IDs.
+   * @param operations  The collection of offer operations to perform.
+   * @param filters     The filters to set for any remaining resources.
+   *
+   * @return            The state of the driver after the call.
+   *
+   * @see OfferID
+   * @see Offer.Operation
+   * @see Filters
+   * @see Status
+   */
+  Status acceptOffers(Collection<OfferID> offerIds,
+                      Collection<Offer.Operation> operations,
+                      Filters filters);
+
+  /**
    * Declines an offer in its entirety and applies the specified
    * filters on the resources (see mesos.proto for a description of
    * Filters). Note that this can be done at any time, it is not