You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/03/12 02:22:16 UTC
[2/3] mesos git commit: Added Java binding for the new acceptOffers
API.
Added Java binding for the new acceptOffers API.
Review: https://reviews.apache.org/r/31873
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6a6cb4b5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6a6cb4b5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6a6cb4b5
Branch: refs/heads/master
Commit: 6a6cb4b58874f0f4f934cd52024a3390dea03ed2
Parents: 4f455c2
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Mar 11 18:16:30 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Mar 11 18:21:53 2015 -0700
----------------------------------------------------------------------
src/examples/java/TestFramework.java | 22 ++++++-
src/java/jni/construct.cpp | 21 ++++++
.../org_apache_mesos_MesosSchedulerDriver.cpp | 67 ++++++++++++++++++++
.../org/apache/mesos/MesosSchedulerDriver.java | 4 ++
.../src/org/apache/mesos/SchedulerDriver.java | 24 +++++++
5 files changed, 135 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/examples/java/TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/TestFramework.java b/src/examples/java/TestFramework.java
index ffc90b5..9e95369 100644
--- a/src/examples/java/TestFramework.java
+++ b/src/examples/java/TestFramework.java
@@ -64,7 +64,7 @@ public class TestFramework {
double MEM_PER_TASK = 128;
for (Offer offer : offers) {
- List<TaskInfo> tasks = new ArrayList<TaskInfo>();
+ Offer.Operation.Launch.Builder launch = Offer.Operation.Launch.newBuilder();
double offerCpus = 0;
double offerMem = 0;
for (Resource resource : offer.getResourcesList()) {
@@ -105,13 +105,29 @@ public class TestFramework {
.setExecutor(ExecutorInfo.newBuilder(executor))
.build();
- tasks.add(task);
+ launch.addTaskInfos(TaskInfo.newBuilder(task));
remainingCpus -= CPUS_PER_TASK;
remainingMem -= MEM_PER_TASK;
}
+
+ // NOTE: We use the new API `acceptOffers` here to launch tasks. The
+ // 'launchTasks' API will be deprecated.
+ List<OfferID> offerIds = new ArrayList<OfferID>();
+ offerIds.add(offer.getId());
+
+ List<Offer.Operation> operations = new ArrayList<Offer.Operation>();
+
+ Offer.Operation operation = Offer.Operation.newBuilder()
+ .setType(Offer.Operation.Type.LAUNCH)
+ .setLaunch(launch)
+ .build();
+
+ operations.add(operation);
+
Filters filters = Filters.newBuilder().setRefuseSeconds(1).build();
- driver.launchTasks(offer.getId(), tasks, filters);
+
+ driver.acceptOffers(offerIds, operations, filters);
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/jni/construct.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/construct.cpp b/src/java/jni/construct.cpp
index e54c11e..1343208 100644
--- a/src/java/jni/construct.cpp
+++ b/src/java/jni/construct.cpp
@@ -380,3 +380,24 @@ Request construct(JNIEnv* env, jobject jobj)
return request;
}
+
+
+template <>
+Offer::Operation construct(JNIEnv* env, jobject jobj)
+{
+ jclass clazz = env->GetObjectClass(jobj);
+
+ // byte[] data = obj.toByteArray();
+ jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B");
+
+ jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray);
+
+ jbyte* data = env->GetByteArrayElements(jdata, NULL);
+ jsize length = env->GetArrayLength(jdata);
+
+ const Offer::Operation& operation = parse<Offer::Operation>(data, length);
+
+ env->ReleaseByteArrayElements(jdata, data, 0);
+
+ return operation;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
index 4f0dad7..a89ebed 100644
--- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
+++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
@@ -874,6 +874,73 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_launchTasks
/*
* Class: org_apache_mesos_MesosSchedulerDriver
+ * Method: acceptOffers
+ * Signature: (Ljava/util/Collection;Ljava/util/Collection;Lorg/apache/mesos/Protos$Filters;)Lorg/apache/mesos/Protos/Status;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_acceptOffers__Ljava_util_Collection_2Ljava_util_Collection_2Lorg_apache_mesos_Protos_00024Filters_2
+ (JNIEnv* env, jobject thiz, jobject jofferIds, jobject joperations, jobject jfilters)
+{
+ // Construct C++ OfferIDs from each Java OfferIDs.
+ vector<OfferID> offers;
+ jclass clazz = env->GetObjectClass(jofferIds);
+
+ // Iterator iterator = offerIds.iterator();
+ jmethodID iterator =
+ env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;");
+ jobject jiterator = env->CallObjectMethod(jofferIds, iterator);
+
+ clazz = env->GetObjectClass(jiterator);
+
+ // while (iterator.hasNext()) {
+ jmethodID hasNext = env->GetMethodID(clazz, "hasNext", "()Z");
+
+ jmethodID next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;");
+
+ while (env->CallBooleanMethod(jiterator, hasNext)) {
+ // Object offerId = iterator.next();
+ jobject jofferId = env->CallObjectMethod(jiterator, next);
+ offers.push_back(construct<OfferID>(env, jofferId));
+ }
+
+ // Construct C++ Offer::Operations from each Java Offer.Operations.
+ vector<Offer::Operation> operations;
+ clazz = env->GetObjectClass(joperations);
+
+ // Iterator iterator = operations.iterator();
+ iterator = env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;");
+ jiterator = env->CallObjectMethod(joperations, iterator);
+
+ clazz = env->GetObjectClass(jiterator);
+
+ // while (iterator.hasNext()) {
+ hasNext = env->GetMethodID(clazz, "hasNext", "()Z");
+
+ next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;");
+
+ while (env->CallBooleanMethod(jiterator, hasNext)) {
+ // Object operation = iterator.next();
+ jobject joperation = env->CallObjectMethod(jiterator, next);
+ operations.push_back(construct<Offer::Operation>(env, joperation));
+ }
+
+ // Construct C++ Filters from the Java Filters.
+ const Filters& filters = construct<Filters>(env, jfilters);
+
+ // Now invoke the underlying driver.
+ clazz = env->GetObjectClass(thiz);
+
+ jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
+ MesosSchedulerDriver* driver =
+ (MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
+
+ Status status = driver->acceptOffers(offers, operations, filters);
+
+ return convert<Status>(env, status);
+}
+
+
+/*
+ * Class: org_apache_mesos_MesosSchedulerDriver
* Method: declineOffer
* Signature: (Lorg/apache/mesos/Protos/OfferID;Lorg/apache/mesos/Protos/Filters;)Lorg/apache/mesos/Protos/Status;
*/
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
index a1055a5..b9b2ea8 100644
--- a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
@@ -276,6 +276,10 @@ public class MesosSchedulerDriver implements SchedulerDriver {
public native Status killTask(TaskID taskId);
+ public native Status acceptOffers(Collection<OfferID> offerIds,
+ Collection<Offer.Operation> operations,
+ Filters filters);
+
public Status declineOffer(OfferID offerId) {
return declineOffer(offerId, Filters.newBuilder().build());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/src/org/apache/mesos/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index d5b100a..183eec8 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -186,6 +186,30 @@ public interface SchedulerDriver {
Status killTask(TaskID taskId);
/**
+ * Accepts the given offers and performs a sequence of operations on
+ * those accepted offers. See Offer.Operation in mesos.proto for the
+ * set of available operations. Available resources are aggregated
+ * when multiple offers are provided. Note that all offers must
+ * belong to the same slave. Any unused resources will be considered
+ * declined. The specified filters are applied on all unused
+ * resources (see mesos.proto for a description of Filters).
+ *
+ * @param offerIds The collection of offer IDs.
+ * @param operations The collection of offer operations to perform.
+ * @param filters The filters to set for any remaining resources.
+ *
+ * @return The state of the driver after the call.
+ *
+ * @see OfferID
+ * @see Offer.Operation
+ * @see Filters
+ * @see Status
+ */
+ Status acceptOffers(Collection<OfferID> offerIds,
+ Collection<Offer.Operation> operations,
+ Filters filters);
+
+ /**
* Declines an offer in its entirety and applies the specified
* filters on the resources (see mesos.proto for a description of
* Filters). Note that this can be done at any time, it is not