You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/08/02 15:42:43 UTC
[1/7] mesos git commit: Added an example test for the V0/V1 Mesos
java implementation.
Repository: mesos
Updated Branches:
refs/heads/master 9567fb420 -> 74ceb051c
Added an example test for the V0/V1 Mesos java implementation.
Review: https://reviews.apache.org/r/50254
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/74ceb051
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/74ceb051
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/74ceb051
Branch: refs/heads/master
Commit: 74ceb051c454326ace02a7c670c0ea4d911519c2
Parents: 68c27d1
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 23:36:34 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700
----------------------------------------------------------------------
configure.ac | 2 +
src/Makefile.am | 8 +-
src/examples/java/V1TestFramework.java | 386 ++++++++++++++++++++++++++++
src/examples/java/v1-test-framework.in | 39 +++
src/tests/examples_tests.cpp | 4 +
src/tests/java_v0_framework_test.sh | 41 +++
src/tests/java_v1_framework_test.sh | 40 +++
7 files changed, 518 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index d213690..f82c631 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1069,6 +1069,8 @@ __EOF__
[chmod +x src/examples/java/test-multiple-executors-framework])
AC_CONFIG_FILES([src/examples/java/test-log],
[chmod +x src/examples/java/test-log])
+ AC_CONFIG_FILES([src/examples/java/v1-test-framework],
+ [chmod +x src/examples/java/v1-test-framework])
AC_CONFIG_FILES([src/java/mesos.pom])
AC_DEFINE([MESOS_HAS_JAVA])
http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index b9cc040..30d8f72 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1532,7 +1532,8 @@ EXAMPLES_SOURCE = \
$(srcdir)/examples/java/TestExecutor.java \
$(srcdir)/examples/java/TestFramework.java \
$(srcdir)/examples/java/TestLog.java \
- $(srcdir)/examples/java/TestMultipleExecutorsFramework.java
+ $(srcdir)/examples/java/TestMultipleExecutorsFramework.java \
+ $(srcdir)/examples/java/V1TestFramework.java
EXTRA_DIST += $(EXAMPLES_SOURCE)
@@ -2236,7 +2237,8 @@ EXAMPLESCRIPTSJAVA = \
examples/java/test-exception-framework \
examples/java/test-framework \
examples/java/test-log \
- examples/java/test-multiple-executors-framework
+ examples/java/test-multiple-executors-framework \
+ examples/java/v1-test-framework
check_SCRIPTS += $(EXAMPLESCRIPTSJAVA)
mesos_tests_DEPENDENCIES += $(EXAMPLESCRIPTSJAVA)
@@ -2270,6 +2272,8 @@ dist_check_SCRIPTS += \
tests/java_exception_test.sh \
tests/java_framework_test.sh \
tests/java_log_test.sh \
+ tests/java_v0_framework_test.sh \
+ tests/java_v1_framework_test.sh \
tests/no_executor_framework_test.sh \
tests/persistent_volume_framework_test.sh \
tests/python_framework_test.sh \
http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/examples/java/V1TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/V1TestFramework.java b/src/examples/java/V1TestFramework.java
new file mode 100644
index 0000000..a41edbc
--- /dev/null
+++ b/src/examples/java/V1TestFramework.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import java.util.concurrent.locks.*;
+
+import org.apache.mesos.v1.*;
+
+import org.apache.mesos.v1.Protos.*;
+
+import org.apache.mesos.v1.scheduler.JNIMesos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.apache.mesos.v1.scheduler.V0Mesos;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+
+public class V1TestFramework {
+ static class TestScheduler implements Scheduler {
+
+ public TestScheduler(
+ String master,
+ FrameworkInfo framework,
+ ExecutorInfo executor) {
+ this(master, framework, executor, 5);
+ }
+
+ public TestScheduler(
+ String master,
+ FrameworkInfo framework,
+ ExecutorInfo executor,
+ int totalTasks) {
+ this.framework = framework;
+ this.executor = executor;
+ this.totalTasks = totalTasks;
+ this.state = State.DISCONNECTED;
+ }
+
+ // TODO(anand): Synchronize on `state` instead.
+ @Override
+ public synchronized void connected(final Mesos mesos) {
+ System.out.println("Connected");
+
+ state = State.CONNECTED;
+
+ retryTimer = new Timer();
+ retryTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ doReliableRegistration(mesos);
+ }
+ }, 0, 1000); // Repeat every 1 second
+ }
+
+ @Override
+ public synchronized void disconnected(Mesos mesos) {
+ System.out.println("Disconnected");
+
+ state = state.DISCONNECTED;
+ cancelRetryTimer();
+ }
+
+ @Override
+ public synchronized void received(Mesos mesos, Event event) {
+ switch (event.getType()) {
+ case SUBSCRIBED: {
+ frameworkId = event.getSubscribed().getFrameworkId();
+ state = State.SUBSCRIBED;
+
+ System.out.println("Subscribed with ID " + frameworkId);
+ break;
+ }
+
+ case OFFERS: {
+ System.out.println("Received an OFFERS event");
+
+ offers(mesos, event.getOffers().getOffersList());
+ break;
+ }
+
+ case RESCIND: {
+ System.out.println("Received an RESCIND event");
+ break;
+ }
+
+ case UPDATE: {
+ System.out.println("Received an UPDATE event");
+
+ update(mesos, event.getUpdate().getStatus());
+ break;
+ }
+
+ case MESSAGE: {
+ System.out.println("Received a MESSAGE event");
+ break;
+ }
+
+ case FAILURE: {
+ System.out.println("Received a FAILURE event");
+ break;
+ }
+
+ case ERROR: {
+ System.out.println("Received an ERROR event");
+ System.exit(1);
+ }
+
+ case HEARTBEAT: {
+ // TODO(anand): Force reconnection with the master upon lack
+ // of heartbeats.
+ System.out.println("Received a HEARTBEAT event");
+ break;
+ }
+
+ case UNKNOWN: {
+ System.out.println("Received an UNKNOWN event");
+ break;
+ }
+ }
+ }
+
+ public synchronized void doReliableRegistration(Mesos mesos) {
+ if (state == State.SUBSCRIBED || state == State.DISCONNECTED) {
+ cancelRetryTimer();
+ return;
+ }
+
+ Call.Builder callBuilder = Call.newBuilder()
+ .setType(Call.Type.SUBSCRIBE)
+ .setSubscribe(Call.Subscribe.newBuilder()
+ .setFrameworkInfo(framework)
+ .build());
+
+ mesos.send(callBuilder.build());
+ }
+
+ private void cancelRetryTimer() {
+ // Cancel previously active timer (if one exists).
+ if (retryTimer != null) {
+ retryTimer.cancel();
+ retryTimer.purge();
+ }
+
+ retryTimer = null;
+ }
+
+ public void offers(Mesos mesos, List<Offer> offers) {
+ double CPUS_PER_TASK = 1;
+ double MEM_PER_TASK = 128;
+
+ for (Offer offer : offers) {
+ Offer.Operation.Launch.Builder launch =
+ Offer.Operation.Launch.newBuilder();
+
+ double offerCpus = 0;
+ double offerMem = 0;
+ for (Resource resource : offer.getResourcesList()) {
+ if (resource.getName().equals("cpus")) {
+ offerCpus += resource.getScalar().getValue();
+ } else if (resource.getName().equals("mem")) {
+ offerMem += resource.getScalar().getValue();
+ }
+ }
+
+ System.out.println(
+ "Received offer " + offer.getId().getValue() + " with cpus: " +
+ offerCpus + " and mem: " + offerMem);
+
+ double remainingCpus = offerCpus;
+ double remainingMem = offerMem;
+ while (launchedTasks < totalTasks &&
+ remainingCpus >= CPUS_PER_TASK &&
+ remainingMem >= MEM_PER_TASK) {
+ TaskID taskId = TaskID.newBuilder()
+ .setValue(Integer.toString(launchedTasks++)).build();
+
+ System.out.println("Launching task " + taskId.getValue() +
+ " using offer " + offer.getId().getValue());
+
+ TaskInfo task = TaskInfo.newBuilder()
+ .setName("task " + taskId.getValue())
+ .setTaskId(taskId)
+ .setAgentId(offer.getAgentId())
+ .addResources(Resource.newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder()
+ .setValue(CPUS_PER_TASK)
+ .build()))
+ .addResources(Resource.newBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder()
+ .setValue(MEM_PER_TASK)
+ .build()))
+ .setExecutor(ExecutorInfo.newBuilder(executor))
+ .build();
+
+ launch.addTaskInfos(TaskInfo.newBuilder(task));
+
+ remainingCpus -= CPUS_PER_TASK;
+ remainingMem -= MEM_PER_TASK;
+ }
+
+ mesos.send(Call.newBuilder()
+ .setType(Call.Type.ACCEPT)
+ .setFrameworkId(frameworkId)
+ .setAccept(Call.Accept.newBuilder()
+ .addOfferIds(offer.getId())
+ .addOperations(Offer.Operation.newBuilder()
+ .setType(Offer.Operation.Type.LAUNCH)
+ .setLaunch(launch)
+ .build())
+ .setFilters(Filters.newBuilder()
+ .setRefuseSeconds(1)
+ .build()))
+ .build());
+ }
+ }
+
+ public void update(Mesos mesos, TaskStatus status) {
+ System.out.println(
+ "Status update: task " + status.getTaskId().getValue() +
+ " is in state " + status.getState().getValueDescriptor().getName());
+
+ if (status.getState() == TaskState.TASK_FINISHED) {
+ finishedTasks++;
+ System.out.println("Finished tasks: " + finishedTasks);
+ if (finishedTasks == totalTasks) {
+ lock.lock();
+ try {
+ finished = true;
+ finishedCondtion.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ if (status.getState() == TaskState.TASK_LOST ||
+ status.getState() == TaskState.TASK_KILLED ||
+ status.getState() == TaskState.TASK_FAILED) {
+ System.err.println(
+ "Aborting because task " + status.getTaskId().getValue() +
+ " is in unexpected state " +
+ status.getState().getValueDescriptor().getName() +
+ " with reason '" +
+ status.getReason().getValueDescriptor().getName() + "'" +
+ " from source '" +
+ status.getSource().getValueDescriptor().getName() + "'" +
+ " with message '" + status.getMessage() + "'");
+
+ System.exit(1);
+ }
+
+ mesos.send(Call.newBuilder()
+ .setType(Call.Type.ACKNOWLEDGE)
+ .setFrameworkId(frameworkId)
+ .setAcknowledge(Call.Acknowledge.newBuilder()
+ .setAgentId(status.getAgentId())
+ .setTaskId(status.getTaskId())
+ .setUuid(status.getUuid())
+ .build())
+ .build());
+ }
+
+ private enum State {
+ DISCONNECTED,
+ CONNECTED,
+ SUBSCRIBED
+ }
+
+ private FrameworkInfo framework;
+ private FrameworkID frameworkId;
+ private final ExecutorInfo executor;
+ private final int totalTasks;
+ private int launchedTasks = 0;
+ private int finishedTasks = 0;
+ private State state;
+ private Timer retryTimer = null;
+ }
+
+ private static void usage() {
+ String name = V1TestFramework.class.getName();
+ System.err.println("Usage: " + name + " master version{0,1}");
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2 || args.length > 3) {
+ usage();
+ System.exit(1);
+ }
+
+ int version = Integer.parseInt(args[1]);
+ if (version != 0 && version != 1) {
+ usage();
+ System.exit(1);
+ }
+
+ String uri = new File("./test-executor").getCanonicalPath();
+
+ ExecutorInfo executor = ExecutorInfo.newBuilder()
+ .setExecutorId(ExecutorID.newBuilder().setValue("default"))
+ .setCommand(CommandInfo.newBuilder().setValue(uri))
+ .setName("Test Executor (Java)")
+ .setSource("java_test")
+ .build();
+
+ FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder()
+ .setUser(System.getProperty("user.name", "default-user"))
+ .setName("V" + version + " Test Framework (Java)");
+
+ Credential.Builder credentialBuilder = null;
+
+ if (System.getenv("DEFAULT_PRINCIPAL") != null) {
+ frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL"));
+
+ if (System.getenv("DEFAULT_SECRET") != null) {
+ credentialBuilder = Credential.newBuilder()
+ .setPrincipal(System.getenv("DEFAULT_PRINCIPAL"))
+ .setSecret(System.getenv("DEFAULT_SECRET"));
+ }
+ }
+
+ Scheduler scheduler = args.length == 2
+ ? new TestScheduler(args[0], frameworkBuilder.build(), executor)
+ : new TestScheduler(args[0],
+ frameworkBuilder.build(),
+ executor,
+ Integer.parseInt(args[2]));
+
+ Mesos mesos;
+ if (credentialBuilder != null) {
+ mesos = version == 1
+ ? new JNIMesos(scheduler, args[0], credentialBuilder.build())
+ : new V0Mesos(scheduler,
+ frameworkBuilder.build(),
+ args[0],
+ credentialBuilder.build());
+ } else {
+ mesos = version == 1
+ ? new JNIMesos(scheduler, args[0])
+ : new V0Mesos(scheduler, frameworkBuilder.build(), args[0]);
+ }
+
+ lock.lock();
+ try {
+ while (!finished) {
+ finishedCondtion.await();
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ System.exit(0);
+ }
+
+ static boolean finished = false;
+ final static Lock lock = new ReentrantLock();
+ final static Condition finishedCondtion = lock.newCondition();
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/examples/java/v1-test-framework.in
----------------------------------------------------------------------
diff --git a/src/examples/java/v1-test-framework.in b/src/examples/java/v1-test-framework.in
new file mode 100644
index 0000000..6e5e581
--- /dev/null
+++ b/src/examples/java/v1-test-framework.in
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+
+# This script uses MESOS_SOURCE_DIR and MESOS_BUILD_DIR which come
+# from configuration substitutions.
+MESOS_SOURCE_DIR=@abs_top_srcdir@
+MESOS_BUILD_DIR=@abs_top_builddir@
+
+# Locate Java from environment or use configure discovered location.
+JAVA_HOME=${JAVA_HOME-@JAVA_HOME@}
+JAVA=${JAVA-${JAVA_HOME}/bin/java}
+
+# Use colors for errors.
+. ${MESOS_SOURCE_DIR}/support/colors.sh
+
+PROTOBUF_JAR=@PROTOBUF_JAR@
+
+test ! -e ${PROTOBUF_JAR} && \
+ echo "${RED}Failed to find ${PROTOBUF_JAR}${NORMAL}" && \
+ exit 1
+
+MESOS_JAR=${MESOS_BUILD_DIR}/src/java/target/mesos-@PACKAGE_VERSION@.jar
+
+test ! -e ${MESOS_JAR} && \
+ echo "${RED}Failed to find ${MESOS_JAR}${NORMAL}" && \
+ exit 1
+
+EXAMPLES_JAR=${MESOS_BUILD_DIR}/src/examples.jar
+
+test ! -e ${EXAMPLES_JAR} && \
+ echo "${RED}Failed to find ${EXAMPLES_JAR}${NORMAL}" && \
+ exit 1
+
+# Need to run in the directory containing this script so that the
+# framework is able to find the executor.
+cd `dirname ${0}`
+
+exec ${JAVA} -cp ${PROTOBUF_JAR}:${MESOS_JAR}:${EXAMPLES_JAR} \
+ -Djava.library.path=${MESOS_BUILD_DIR}/src/.libs \
+ V1TestFramework "${@}"
http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/examples_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp
index cac5304..52fac33 100644
--- a/src/tests/examples_tests.cpp
+++ b/src/tests/examples_tests.cpp
@@ -38,6 +38,10 @@ TEST_SCRIPT(ExamplesTest, DiskFullFramework,
TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh")
TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh")
TEST_SCRIPT(ExamplesTest, JavaLog, "java_log_test.sh")
+
+// TODO(anand): Parameterize these tests on version.
+TEST_SCRIPT(ExamplesTest, V0JavaFramework, "java_v0_framework_test.sh")
+TEST_SCRIPT(ExamplesTest, V1JavaFramework, "java_v1_framework_test.sh")
#endif
#ifdef MESOS_HAS_PYTHON
http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/java_v0_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/java_v0_framework_test.sh b/src/tests/java_v0_framework_test.sh
new file mode 100755
index 0000000..12daae4
--- /dev/null
+++ b/src/tests/java_v0_framework_test.sh
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+ echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+ exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+ echo "Failed to find MESOS_BUILD_DIR in environment" && \
+ exit 1
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Lower the authentication timeout to speed up the test (the master
+# may drop the authentication message while it is recovering).
+export MESOS_AUTHENTICATION_TIMEOUT=200ms
+
+# Set local Mesos runner to use 3 slaves.
+export MESOS_NUM_SLAVES=3
+
+# Set resources for the slave.
+export MESOS_RESOURCES="cpus:2;mem:10240"
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the Java test framework executes without crashing (returns 0).
+exec $MESOS_BUILD_DIR/src/examples/java/v1-test-framework local 0
http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/java_v1_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/java_v1_framework_test.sh b/src/tests/java_v1_framework_test.sh
new file mode 100755
index 0000000..c49e11c
--- /dev/null
+++ b/src/tests/java_v1_framework_test.sh
@@ -0,0 +1,40 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+ echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+ exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+ echo "Failed to find MESOS_BUILD_DIR in environment" && \
+ exit 1
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Set the connection delay to 0 to speed up the tests.
+export MESOS_CONNECTION_DELAY_MAX=0ms;
+
+# Set local Mesos runner to use 3 slaves.
+export MESOS_NUM_SLAVES=3
+
+# Set resources for the slave.
+export MESOS_RESOURCES="cpus:2;mem:10240"
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the Java test framework executes without crashing (returns 0).
+exec $MESOS_BUILD_DIR/src/examples/java/v1-test-framework local 1
[6/7] mesos git commit: Added native implementation for the V0 Mesos
Adapter.
Posted by an...@apache.org.
Added native implementation for the V0 Mesos Adapter.
This change adds the C++ implementation for the JAVA V0 to V1 Mesos
implementation adapter (driver + scheduler).
Review: https://reviews.apache.org/r/50253
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/68c27d10
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/68c27d10
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/68c27d10
Branch: refs/heads/master
Commit: 68c27d1095dd33e569a96a05337dbd961cfdd6fe
Parents: bca68f6
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 23:23:05 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 9 +-
.../org_apache_mesos_v1_scheduler_V0Mesos.cpp | 893 +++++++++++++++++++
2 files changed, 901 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/68c27d10/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 2b02b5f..b9cc040 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1563,6 +1563,7 @@ libjava_la_SOURCES = \
java/jni/org_apache_mesos_state_Variable.cpp \
java/jni/org_apache_mesos_state_ZooKeeperState.cpp \
java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp \
+ java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp \
jvm/jvm.cpp \
jvm/jvm.hpp \
jvm/java/io.hpp \
@@ -1605,7 +1606,8 @@ nodist_libjava_la_SOURCES = \
java/jni/org_apache_mesos_state_LogState.h \
java/jni/org_apache_mesos_state_Variable.h \
java/jni/org_apache_mesos_state_ZooKeeperState.h \
- java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h
+ java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h \
+ java/jni/org_apache_mesos_v1_scheduler_V0Mesos.h
BUILT_SOURCES += $(nodist_libjava_la_SOURCES)
@@ -1654,6 +1656,11 @@ java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h: $(MESOS_JAR)
-classpath $(MESOS_JAR):@PROTOBUF_JAR@ \
org.apache.mesos.v1.scheduler.JNIMesos
+java/jni/org_apache_mesos_v1_scheduler_V0Mesos.h: $(MESOS_JAR)
+ $(JAVA_HOME)/bin/javah -d java/jni \
+ -classpath $(MESOS_JAR):@PROTOBUF_JAR@ \
+ org.apache.mesos.v1.scheduler.V0Mesos
+
$(EXAMPLES_JAR): $(EXAMPLES_SOURCE)
@echo "Building examples.jar ..."
$(MKDIR_P) examples/java
http://git-wip-us.apache.org/repos/asf/mesos/blob/68c27d10/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
new file mode 100644
index 0000000..7febe95
--- /dev/null
+++ b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
@@ -0,0 +1,893 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <queue>
+#include <string>
+#include <vector>
+
+#include <mesos/mesos.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/exit.hpp>
+#include <stout/unreachable.hpp>
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "jvm/jvm.hpp"
+
+#include "master/constants.hpp"
+#include "master/validation.hpp"
+
+#include "convert.hpp"
+#include "construct.hpp"
+#include "org_apache_mesos_v1_scheduler_V0Mesos.h"
+
+using namespace mesos;
+using namespace mesos::internal::master;
+
+using std::queue;
+using std::string;
+using std::vector;
+
+using mesos::internal::devolve;
+using mesos::internal::evolve;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+
+using process::Clock;
+using process::Owned;
+using process::Timer;
+
+class V0ToV1AdapterProcess; // Forward declaration.
+
+// This interface acts as an adapter from the v0 (driver + scheduler) to the
+// v1 Mesos scheduler.
+class V0ToV1Adapter : public mesos::Scheduler, public v1::scheduler::MesosBase
+{
+public:
+ V0ToV1Adapter(
+ JNIEnv* env,
+ jweak jmesos,
+ const FrameworkInfo& frameworkInfo,
+ const string& master,
+ const Option<Credential>& credential);
+
+ virtual ~V0ToV1Adapter();
+
+ // v0 Scheduler interface overrides.
+ virtual void registered(
+ SchedulerDriver* driver,
+ const FrameworkID& frameworkId,
+ const MasterInfo& masterInfo) override;
+
+ virtual void reregistered(
+ SchedulerDriver* driver,
+ const MasterInfo& masterInfo) override;
+
+ virtual void disconnected(SchedulerDriver* driver) override;
+
+ virtual void resourceOffers(
+ SchedulerDriver* driver,
+ const vector<Offer>& offers) override;
+
+ virtual void offerRescinded(
+ SchedulerDriver* driver,
+ const OfferID& offerId) override;
+
+ virtual void statusUpdate(
+ SchedulerDriver* driver,
+ const TaskStatus& status) override;
+
+ virtual void frameworkMessage(
+ SchedulerDriver* driver,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ const string& data) override;
+
+ virtual void slaveLost(
+ SchedulerDriver* driver,
+ const SlaveID& slaveId) override;
+
+ virtual void executorLost(
+ SchedulerDriver* driver,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ int status) override;
+
+ virtual void error(
+ SchedulerDriver* driver,
+ const string& message) override;
+
+ // v1 MesosBase interface overrides.
+ virtual void send(const v1::scheduler::Call& call) override;
+
+ virtual void reconnect() override
+ {
+ // The driver does not support explicit reconnection with the master.
+ UNREACHABLE();
+ }
+
+ process::Owned<V0ToV1AdapterProcess> process;
+
+private:
+ Owned<MesosSchedulerDriver> driver;
+};
+
+
+// The process (below) is responsible for ensuring synchronized access between
+// callbacks received from the driver and calls invoked by the adapter.
+class V0ToV1AdapterProcess : public process::Process<V0ToV1AdapterProcess>
+{
+public:
+ V0ToV1AdapterProcess(JNIEnv* env, jweak jmesos);
+
+ virtual ~V0ToV1AdapterProcess() = default;
+
+ void registered(const FrameworkID& frameworkId);
+
+ void reregistered();
+
+ void disconnected();
+
+ void resourceOffers(const vector<Offer>& offers);
+
+ void offerRescinded(const OfferID& offerId);
+
+ void statusUpdate(const TaskStatus& status);
+
+ void frameworkMessage(
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ const string& data);
+
+ void slaveLost(const SlaveID& slaveId);
+
+ void executorLost(
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ int status);
+
+ void error(const string& message);
+
+ void send(SchedulerDriver*, const v1::scheduler::Call& call);
+
+ JavaVM* jvm;
+ JNIEnv* env;
+ jweak jmesos;
+
+protected:
+ void received(const Event& event);
+
+ void _received();
+
+ void __received(const Event& event);
+
+ void heartbeat();
+
+ void disconnect();
+
+private:
+ bool subscribeCall;
+ const Duration interval;
+ queue<Event> pending;
+ Option<FrameworkID> frameworkId;
+ Option<Timer> heartbeatTimer;
+};
+
+
+V0ToV1Adapter::V0ToV1Adapter(
+ JNIEnv* env,
+ jweak jmesos,
+ const FrameworkInfo& frameworkInfo,
+ const string& master,
+ const Option<Credential>& credential)
+ : process(new V0ToV1AdapterProcess(env, jmesos))
+{
+ spawn(process.get());
+
+ driver.reset(
+ credential.isSome()
+ // Disable implicit acks.
+ ? new MesosSchedulerDriver(
+ this, frameworkInfo, master, false, credential.get())
+ : new MesosSchedulerDriver(this, frameworkInfo, master, false));
+
+ driver->start();
+}
+
+
+V0ToV1Adapter::~V0ToV1Adapter()
+{
+ terminate(process.get());
+ wait(process.get());
+}
+
+
+void V0ToV1Adapter::error(
+ SchedulerDriver*,
+ const string& message)
+{
+ process::dispatch(process.get(), &V0ToV1AdapterProcess::error, message);
+}
+
+
+void V0ToV1Adapter::executorLost(
+ SchedulerDriver*,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ int status)
+{
+ process::dispatch(
+ process.get(),
+ &V0ToV1AdapterProcess::executorLost,
+ executorId,
+ slaveId,
+ status);
+}
+
+
+void V0ToV1Adapter::slaveLost(
+ SchedulerDriver*,
+ const SlaveID& slaveId)
+{
+ process::dispatch(process.get(), &V0ToV1AdapterProcess::slaveLost, slaveId);
+}
+
+
+void V0ToV1Adapter::frameworkMessage(
+ SchedulerDriver*,
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ const string& data)
+{
+ process::dispatch(
+ process.get(),
+ &V0ToV1AdapterProcess::frameworkMessage,
+ executorId,
+ slaveId,
+ data);
+}
+
+
+void V0ToV1Adapter::statusUpdate(
+ SchedulerDriver*,
+ const TaskStatus& status)
+{
+ process::dispatch(process.get(), &V0ToV1AdapterProcess::statusUpdate, status);
+}
+
+
+void V0ToV1Adapter::offerRescinded(
+ SchedulerDriver*,
+ const OfferID& offerId)
+{
+ process::dispatch(
+ process.get(), &V0ToV1AdapterProcess::offerRescinded, offerId);
+}
+
+
+void V0ToV1Adapter::resourceOffers(
+ SchedulerDriver*,
+ const vector<Offer>& offers)
+{
+ process::dispatch(
+ process.get(), &V0ToV1AdapterProcess::resourceOffers, offers);
+}
+
+
+void V0ToV1Adapter::registered(
+ SchedulerDriver*,
+ const FrameworkID &frameworkId,
+ const MasterInfo&)
+{
+ process::dispatch(
+ process.get(),
+ &V0ToV1AdapterProcess::registered,
+ frameworkId);
+}
+
+
+void V0ToV1Adapter::reregistered(
+ SchedulerDriver*,
+ const MasterInfo&)
+{
+ process::dispatch(process.get(), &V0ToV1AdapterProcess::reregistered);
+}
+
+
+void V0ToV1Adapter::disconnected(SchedulerDriver*)
+{
+ process::dispatch(process.get(), &V0ToV1AdapterProcess::disconnected);
+}
+
+
+void V0ToV1Adapter::send(const Call& call)
+{
+ process::dispatch(
+ process.get(), &V0ToV1AdapterProcess::send, driver.get(), call);
+}
+
+
+V0ToV1AdapterProcess::V0ToV1AdapterProcess(
+ JNIEnv* _env,
+ jweak _jmesos)
+ : ProcessBase(process::ID::generate("SchedulerV0ToV1Adapter")),
+ jvm(nullptr),
+ env(_env),
+ jmesos(_jmesos),
+ subscribeCall(false),
+ interval(DEFAULT_HEARTBEAT_INTERVAL)
+{
+ env->GetJavaVM(&jvm);
+}
+
+
+void V0ToV1AdapterProcess::registered(const FrameworkID& _frameworkId)
+{
+ jvm->AttachCurrentThread(JNIENV_CAST(&env), NULL);
+
+ jclass clazz = env->GetObjectClass(jmesos);
+
+ jfieldID scheduler =
+ env->GetFieldID(clazz, "scheduler",
+ "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+ jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+ clazz = env->GetObjectClass(jscheduler);
+
+ // scheduler.connected(mesos);
+ jmethodID connected =
+ env->GetMethodID(clazz, "connected",
+ "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+ env->ExceptionClear();
+
+ env->CallVoidMethod(jscheduler, connected, jmesos);
+
+ if (env->ExceptionCheck()) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ jvm->DetachCurrentThread();
+ ABORT("Exception thrown during `connected` call");
+ }
+
+ jvm->DetachCurrentThread();
+
+ // We need this copy to populate the fields in `Event::Subscribed` upon
+ // receiving a `reregistered()` callback later.
+ frameworkId = _frameworkId;
+
+ // These events are queued and delivered to the scheduler upon receiving the
+ // subscribe call later. See comments in `send()` for more details.
+ {
+ Event event;
+ event.set_type(Event::SUBSCRIBED);
+
+ Event::Subscribed* subscribed = event.mutable_subscribed();
+
+ subscribed->mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
+
+ subscribed->set_heartbeat_interval_seconds(interval.secs());
+
+ received(event);
+ }
+
+ {
+ Event event;
+ event.set_type(Event::HEARTBEAT);
+
+ received(event);
+ }
+}
+
+
+void V0ToV1AdapterProcess::reregistered()
+{
+ CHECK_SOME(frameworkId);
+ registered(frameworkId.get());
+}
+
+
+void V0ToV1AdapterProcess::disconnected()
+{
+ disconnect();
+
+ jvm->AttachCurrentThread(JNIENV_CAST(&env), NULL);
+
+ jclass clazz = env->GetObjectClass(jmesos);
+
+ jfieldID scheduler =
+ env->GetFieldID(clazz, "scheduler",
+ "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+ jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+ clazz = env->GetObjectClass(jscheduler);
+
+ // scheduler.disconnected(mesos);
+ jmethodID disconnected =
+ env->GetMethodID(clazz, "disconnected",
+ "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+ env->ExceptionClear();
+
+ env->CallVoidMethod(jmesos, disconnected);
+
+ if (env->ExceptionCheck()) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ jvm->DetachCurrentThread();
+ ABORT("Exception thrown during `disconnected` call");
+ }
+
+ jvm->DetachCurrentThread();
+}
+
+
+void V0ToV1AdapterProcess::resourceOffers(const vector<Offer>& _offers)
+{
+ Event event;
+ event.set_type(Event::OFFERS);
+
+ Event::Offers* offers = event.mutable_offers();
+
+ foreach (const Offer& offer, _offers) {
+ offers->add_offers()->CopyFrom(evolve(offer));
+ }
+
+ received(event);
+}
+
+
+void V0ToV1AdapterProcess::offerRescinded(const OfferID& offerId)
+{
+ Event event;
+ event.set_type(Event::RESCIND);
+
+ event.mutable_rescind()->mutable_offer_id()->
+ CopyFrom(evolve(offerId));
+
+ received(event);
+}
+
+
+void V0ToV1AdapterProcess::statusUpdate(const TaskStatus& status)
+{
+ Event event;
+ event.set_type(Event::UPDATE);
+
+ event.mutable_update()->mutable_status()->
+ CopyFrom(mesos::internal::evolve(status));
+
+ received(event);
+}
+
+
+void V0ToV1AdapterProcess::frameworkMessage(
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ const string& data)
+{
+ Event event;
+ event.set_type(Event::MESSAGE);
+
+ event.mutable_message()->mutable_agent_id()->
+ CopyFrom(mesos::internal::evolve(slaveId));
+
+ event.mutable_message()->mutable_executor_id()->
+ CopyFrom(mesos::internal::evolve(executorId));
+
+ event.mutable_message()->set_data(data.data());
+
+ received(event);
+}
+
+
+void V0ToV1AdapterProcess::slaveLost(const SlaveID& slaveId)
+{
+ Event event;
+ event.set_type(Event::FAILURE);
+
+ event.mutable_failure()->mutable_agent_id()->
+ CopyFrom(mesos::internal::evolve(slaveId));
+
+ received(event);
+}
+
+
+void V0ToV1AdapterProcess::executorLost(
+ const ExecutorID& executorId,
+ const SlaveID& slaveId,
+ int status)
+{
+ Event event;
+ event.set_type(Event::FAILURE);
+
+ event.mutable_failure()->mutable_agent_id()->
+ CopyFrom(mesos::internal::evolve(slaveId));
+
+ event.mutable_failure()->mutable_executor_id()->
+ CopyFrom(mesos::internal::evolve(executorId));
+
+ event.mutable_failure()->set_status(status);
+
+ received(event);
+}
+
+
+void V0ToV1AdapterProcess::error(const string& message)
+{
+ Event event;
+ event.set_type(Event::ERROR);
+
+ event.mutable_error()->set_message(message);
+
+ received(event);
+}
+
+
+void V0ToV1AdapterProcess::send(SchedulerDriver* driver, const Call& _call)
+{
+ CHECK_NOTNULL(driver);
+
+ scheduler::Call call = devolve(_call);
+
+ Option<Error> error = validation::scheduler::call::validate(call);
+ if (error.isSome()) {
+ LOG(WARNING) << "Dropping " << call.type() << ": due to error "
+ << error->message;
+ return;
+ }
+
+ switch (call.type()) {
+ case Call::SUBSCRIBE: {
+ subscribeCall = true;
+
+ heartbeatTimer = process::delay(interval, self(), &Self::heartbeat);
+
+ // The driver subscribes implicitly with the master upon initialization.
+ // For compatibility with the v1 interface, send the already enqueued
+ // subscribed event upon receiving the subscribe request.
+ _received();
+ break;
+ }
+
+ case Call::TEARDOWN: {
+ driver->stop(false);
+ break;
+ }
+
+ case Call::ACCEPT: {
+ vector<OfferID> offerIds;
+ foreach (const OfferID& offerId, call.accept().offer_ids()) {
+ offerIds.emplace_back(offerId);
+ }
+
+ vector<Offer::Operation> operations;
+ foreach (const Offer::Operation& operation, call.accept().operations()) {
+ operations.emplace_back(operation);
+ }
+
+ if (call.accept().has_filters()) {
+ driver->acceptOffers(offerIds, operations, call.accept().filters());
+ } else {
+ driver->acceptOffers(offerIds, operations);
+ }
+
+ break;
+ }
+
+ case Call::ACCEPT_INVERSE_OFFERS:
+ case Call::DECLINE_INVERSE_OFFERS:
+ case Call::SHUTDOWN: {
+ // TODO(anand): Throw java error.
+ LOG(ERROR) << "Received an unexpected " << call.type() << " call";
+ break;
+ }
+
+ case Call::DECLINE: {
+ foreach (const OfferID& offerId, call.decline().offer_ids()) {
+ if (call.decline().has_filters()) {
+ driver->declineOffer(offerId, call.decline().filters());
+ } else {
+ driver->declineOffer(offerId);
+ }
+ }
+
+ break;
+ }
+
+ case Call::REVIVE: {
+ driver->reviveOffers();
+ break;
+ }
+
+ case Call::KILL: {
+ driver->killTask(call.kill().task_id());
+ break;
+ }
+
+ case Call::ACKNOWLEDGE: {
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
+ status.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
+ status.set_uuid(call.acknowledge().uuid());
+
+ driver->acknowledgeStatusUpdate(status);
+ break;
+ }
+
+ case Call::RECONCILE: {
+ vector<TaskStatus> statuses;
+
+ foreach (const scheduler::Call::Reconcile::Task& task,
+ call.reconcile().tasks()) {
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(task.task_id());
+ statuses.emplace_back(status);
+ }
+
+ driver->reconcileTasks(statuses);
+ break;
+ }
+
+ case Call::MESSAGE: {
+ driver->sendFrameworkMessage(
+ call.message().executor_id(),
+ call.message().slave_id(),
+ string(call.message().data()));
+ break;
+ }
+
+ case Call::REQUEST: {
+ vector<Request> requests;
+
+ foreach (const Request& request, call.request().requests()) {
+ requests.emplace_back(request);
+ }
+
+ driver->requestResources(requests);
+ break;
+ }
+
+ case Call::SUPPRESS: {
+ driver->suppressOffers();
+ break;
+ }
+
+ case Call::UNKNOWN: {
+ EXIT(EXIT_FAILURE) << "Received an unexpected " << call.type()
+ << " call";
+ break;
+ }
+ }
+}
+
+
+void V0ToV1AdapterProcess::received(const Event& event)
+{
+ // For compatibility with the v1 interface, we only start sending events
+ // once the scheduler has sent the subscribe call.
+ if (!subscribeCall) {
+ pending.push(event);
+ return;
+ }
+
+ pending.push(event);
+
+ _received();
+}
+
+
+void V0ToV1AdapterProcess::_received()
+{
+ CHECK(subscribeCall);
+
+ while (!pending.empty()) {
+ __received(pending.front());
+ pending.pop();
+ }
+}
+
+
+void V0ToV1AdapterProcess::__received(const Event& event)
+{
+ jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+ jclass clazz = env->GetObjectClass(jmesos);
+
+ jfieldID scheduler =
+ env->GetFieldID(clazz, "scheduler",
+ "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+ jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+ clazz = env->GetObjectClass(jscheduler);
+
+ // scheduler.received(mesos, event);
+ jmethodID received =
+ env->GetMethodID(clazz, "received",
+ "(Lorg/apache/mesos/v1/scheduler/Mesos;"
+ "Lorg/apache/mesos/v1/scheduler/Protos$Event;)V");
+
+ jobject jevent = convert<Event>(env, event);
+
+ env->ExceptionClear();
+
+ env->CallVoidMethod(jscheduler, received, jmesos, jevent);
+
+ if (env->ExceptionCheck()) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ jvm->DetachCurrentThread();
+ ABORT("Exception thrown during `received` call");
+ }
+
+ jvm->DetachCurrentThread();
+}
+
+
+void V0ToV1AdapterProcess::heartbeat()
+{
+ // It is possible that we were unable to cancel this timer upon a
+ // disconnection. If this occurs, don't bother sending the heartbeat
+ // event.
+ if (heartbeatTimer.isNone() || !heartbeatTimer->timeout().expired()) {
+ return;
+ }
+
+ CHECK(subscribeCall)
+ << "Cannot send heartbeat events to the scheduler without receiving a "
+ << "subscribe call";
+
+ Event event;
+ event.set_type(Event::HEARTBEAT);
+
+ received(event);
+
+ heartbeatTimer = process::delay(interval, self(), &Self::heartbeat);
+}
+
+
+void V0ToV1AdapterProcess::disconnect()
+{
+ // Upon noticing a disconnection with the master, we drain the pending
+ // events in the queue that were waiting to be sent to the scheduler
+ // upon receiving the subscribe call.
+ // It's fine to do so because:
+ // - Any outstanding offers are invalidated by the master upon a scheduler
+ // (re-)registration.
+ // - Any task status updates could be reconciled by the scheduler.
+ pending = queue<Event>();
+ subscribeCall = false;
+
+ if (heartbeatTimer.isSome()) {
+ Clock::cancel(heartbeatTimer.get());
+ heartbeatTimer = None();
+ }
+}
+
+
+extern "C" {
+
+/*
+ * Class: org_apache_mesos_v1_scheduler_V0Mesos
+ * Method: initialize
+ * Signature: ()V
+ *
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_initialize
+ (JNIEnv* env, jobject thiz)
+{
+ jclass clazz = env->GetObjectClass(thiz);
+
+ // Create a weak global reference to the Scheduler
+ // instance (we want a global reference so the GC doesn't collect
+ // the instance but we make it weak so the JVM can exit).
+ jweak jmesos = env->NewWeakGlobalRef(thiz);
+
+ // Get out the FrameworkInfo passed into the constructor.
+ jfieldID framework =
+ env->GetFieldID(clazz, "framework",
+ "Lorg/apache/mesos/v1/Protos$FrameworkInfo;");
+
+ jobject jframework = env->GetObjectField(thiz, framework);
+
+ // Get out the master passed into the constructor.
+ jfieldID master = env->GetFieldID(clazz, "master", "Ljava/lang/String;");
+ jobject jmaster = env->GetObjectField(thiz, master);
+
+ // Get out the credential passed into the constructor.
+ jfieldID credential =
+ env->GetFieldID(clazz, "credential",
+ "Lorg/apache/mesos/v1/Protos$Credential;");
+
+ jobject jcredential = env->GetObjectField(thiz, credential);
+
+ Option<Credential> credential_;
+ if (!env->IsSameObject(jcredential, nullptr)) {
+ credential_ = construct<Credential>(env, jcredential);
+ }
+
+ // Create the C++ scheduler and initialize the `__mesos` variable.
+ V0ToV1Adapter* mesos =
+ new V0ToV1Adapter(
+ env,
+ jmesos,
+ devolve(construct<v1::FrameworkInfo>(env, jframework)),
+ construct<string>(env, jmaster),
+ credential_);
+
+ jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+ env->SetLongField(thiz, __mesos, (jlong) mesos);
+}
+
+
+/*
+ * Class: org_apache_mesos_v1_scheduler_V0Mesos
+ * Method: finalize
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_finalize
+ (JNIEnv* env, jobject thiz)
+{
+ jclass clazz = env->GetObjectClass(thiz);
+
+ jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+
+ V0ToV1Adapter* mesos =
+ (V0ToV1Adapter*) env->GetLongField(thiz, __mesos);
+
+ env->DeleteWeakGlobalRef(mesos->process->jmesos);
+
+ delete mesos;
+}
+
+
+/*
+ * Class: org_apache_mesos_v1_scheduler_V0Mesos
+ * Method: send
+ * Signature: (Lorg/apache/mesos/v1/scheduler/Protos/Call;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_send
+ (JNIEnv* env, jobject thiz, jobject jcall)
+{
+ jclass clazz = env->GetObjectClass(thiz);
+
+ jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+
+ V0ToV1Adapter* mesos =
+ (V0ToV1Adapter*) env->GetLongField(thiz, __mesos);
+
+ mesos->send(construct<Call>(env, jcall));
+}
+
+} // extern "C" {
[5/7] mesos git commit: Added v1 Scheduler/Mesos interface in Java.
Posted by an...@apache.org.
Added v1 Scheduler/Mesos interface in Java.
This change adds an interface for the v1 Scheduler + Mesos
interface that the schedulers can use to connect to Mesos.
Review: https://reviews.apache.org/r/50250
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c1ba4a5a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c1ba4a5a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c1ba4a5a
Branch: refs/heads/master
Commit: c1ba4a5a2dc41e6b19b08ef8f62af8b62bc2a481
Parents: 07a1802
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 19:36:22 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 5 +-
.../org/apache/mesos/v1/scheduler/Mesos.java | 58 ++++++++++++++++++++
.../apache/mesos/v1/scheduler/Scheduler.java | 53 ++++++++++++++++++
3 files changed, 115 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c1ba4a5a/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 5eabc33..fa4dea2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1515,7 +1515,10 @@ MESOS_JAR_SOURCE = \
$(srcdir)/java/src/org/apache/mesos/state/LogState.java \
$(srcdir)/java/src/org/apache/mesos/state/State.java \
$(srcdir)/java/src/org/apache/mesos/state/Variable.java \
- $(srcdir)/java/src/org/apache/mesos/state/ZooKeeperState.java
+ $(srcdir)/java/src/org/apache/mesos/state/ZooKeeperState.java \
+ $(srcdir)/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java \
+ $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Mesos.java \
+ $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
MESOS_JAR_GENERATED = $(JAVA_PROTOS) $(V1_JAVA_PROTOS) \
java/generated/org/apache/mesos/MesosNativeLibrary.java
EXTRA_DIST += $(MESOS_JAR_SOURCE) \
http://git-wip-us.apache.org/repos/asf/mesos/blob/c1ba4a5a/src/java/src/org/apache/mesos/v1/scheduler/Mesos.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/v1/scheduler/Mesos.java b/src/java/src/org/apache/mesos/v1/scheduler/Mesos.java
new file mode 100644
index 0000000..db0116a
--- /dev/null
+++ b/src/java/src/org/apache/mesos/v1/scheduler/Mesos.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.v1.scheduler;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+
+/**
+ * Abstract interface for connecting a scheduler to Mesos. This interface
+ * is used to send Call's to Mesos (e.g., launch tasks, kill tasks, etc.).
+ */
+public interface Mesos {
+ /**
+ * Sends a Call to the Mesos master. The scheduler should only invoke this
+ * method once it has received the 'connected' callback. Otherwise, all calls
+ * would be dropped while disconnected.
+ *
+ * Some local validation of calls is performed which may generate
+ * events without ever being sent to the master.
+ *
+ * These comments are copied from include/mesos/v1/scheduler.hpp and should
+ * be kept in sync.
+ */
+ void send(Call call);
+
+ /**
+ * Force a reconnection with the Mesos master.
+ *
+ * In the case of a one-way network partition, the connection between the
+ * scheduler and master might not necessarily break. If the scheduler detects
+ * a partition, due to lack of `HEARTBEAT` events (e.g., 5) within a time
+ * window, it can explicitly ask the library to force a reconnection with
+ * the master.
+ *
+ * This call would be ignored if the scheduler is already disconnected with
+ * the master (e.g., no new master has been elected). Otherwise, the scheduler
+ * would get a 'disconnected' callback followed by a 'connected' callback.
+ *
+ * These comments are copied from include/mesos/v1/scheduler.hpp and should
+ * be kept in sync.
+ */
+ void reconnect();
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c1ba4a5a/src/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/v1/scheduler/Scheduler.java b/src/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
new file mode 100644
index 0000000..28efaff
--- /dev/null
+++ b/src/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.v1.scheduler;
+
+import org.apache.mesos.v1.scheduler.Protos.Event;
+
+/**
+ * Callback interface to be implemented by schedulers.
+ * Note that only one callback will be invoked at a time,
+ * so it is not recommended that you block within a callback because
+ * it may cause a deadlock.
+ * <p>
+ * Each callback includes a reference to the Mesos interface that was
+ * used to run this scheduler. The reference will not change for the
+ * duration of a scheduler from the time it is instantiated.
+ * This is intended for convenience so that a scheduler doesn't need to
+ * store a reference to the interface itself.
+ */
+
+public interface Scheduler {
+ /**
+ * Invoked when a connection is established with the master upon a
+ * master (re-)detection.
+ */
+ void connected(Mesos mesos);
+
+ /**
+ * Invoked when no master is detected or when the existing persistent
+ * connection is interrupted.
+ */
+ void disconnected(Mesos mesos);
+
+ /**
+ * Invoked when a new event is received from the Mesos master.
+ */
+ void received(Mesos mesos, Event event);
+}
[2/7] mesos git commit: Added native implementation for v1 Mesos
interface.
Posted by an...@apache.org.
Added native implementation for v1 Mesos interface.
This change adds the native C++ implementation for the v1
Java class `JNIMesos` used for interacting with Mesos.
Review: https://reviews.apache.org/r/50252
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bca68f63
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bca68f63
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bca68f63
Branch: refs/heads/master
Commit: bca68f63aa04d91cdb44b8e100f364bf981d6bd2
Parents: 5855532
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 23:17:50 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 9 +-
.../org_apache_mesos_v1_scheduler_JNIMesos.cpp | 317 +++++++++++++++++++
2 files changed, 325 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bca68f63/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3a743e4..2b02b5f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1562,6 +1562,7 @@ libjava_la_SOURCES = \
java/jni/org_apache_mesos_state_LogState.cpp \
java/jni/org_apache_mesos_state_Variable.cpp \
java/jni/org_apache_mesos_state_ZooKeeperState.cpp \
+ java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp \
jvm/jvm.cpp \
jvm/jvm.hpp \
jvm/java/io.hpp \
@@ -1603,7 +1604,8 @@ nodist_libjava_la_SOURCES = \
java/jni/org_apache_mesos_state_LevelDBState.h \
java/jni/org_apache_mesos_state_LogState.h \
java/jni/org_apache_mesos_state_Variable.h \
- java/jni/org_apache_mesos_state_ZooKeeperState.h
+ java/jni/org_apache_mesos_state_ZooKeeperState.h \
+ java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h
BUILT_SOURCES += $(nodist_libjava_la_SOURCES)
@@ -1647,6 +1649,11 @@ java/jni/org_apache_mesos_state_ZooKeeperState.h: $(MESOS_JAR)
-classpath $(MESOS_JAR):@PROTOBUF_JAR@ \
org.apache.mesos.state.ZooKeeperState
+java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h: $(MESOS_JAR)
+ $(JAVA_HOME)/bin/javah -d java/jni \
+ -classpath $(MESOS_JAR):@PROTOBUF_JAR@ \
+ org.apache.mesos.v1.scheduler.JNIMesos
+
$(EXAMPLES_JAR): $(EXAMPLES_SOURCE)
@echo "Building examples.jar ..."
$(MKDIR_P) examples/java
http://git-wip-us.apache.org/repos/asf/mesos/blob/bca68f63/src/java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp
new file mode 100644
index 0000000..e197b37
--- /dev/null
+++ b/src/java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+#include <vector>
+
+#include <process/owned.hpp>
+
+#include <mesos/v1/mesos.hpp>
+
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+#include "jvm/jvm.hpp"
+
+#include "construct.hpp"
+#include "convert.hpp"
+#include "org_apache_mesos_v1_scheduler_JNIMesos.h"
+
+using namespace mesos::v1::scheduler;
+
+using mesos::v1::Credential;
+
+using process::Owned;
+
+using std::string;
+using std::vector;
+
+namespace v1 {
+
+class JNIMesos
+{
+public:
+ JNIMesos(
+ JNIEnv* _env,
+ jweak _jmesos,
+ const string& master,
+ const Option<Credential>& credential)
+ : jvm(nullptr), env(_env), jmesos(_jmesos)
+ {
+ env->GetJavaVM(&jvm);
+
+ mesos.reset(
+ new Mesos(master,
+ mesos::ContentType::PROTOBUF,
+ std::bind(&JNIMesos::connected, this),
+ std::bind(&JNIMesos::disconnected, this),
+ std::bind(&JNIMesos::received_, this, lambda::_1),
+ credential));
+ }
+
+ virtual ~JNIMesos() = default;
+
+ virtual void connected();
+ virtual void disconnected();
+ virtual void received(const Event& event);
+
+ void received_(std::queue<Event> events) {
+ while (!events.empty()) {
+ received(events.front());
+ events.pop();
+ }
+ }
+
+ JavaVM* jvm;
+ JNIEnv* env;
+ jweak jmesos;
+
+ Owned<Mesos> mesos;
+};
+
+
+void JNIMesos::connected()
+{
+ jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+ jclass clazz = env->GetObjectClass(jmesos);
+
+ jfieldID scheduler =
+ env->GetFieldID(clazz, "scheduler",
+ "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+ jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+ clazz = env->GetObjectClass(jscheduler);
+
+ // scheduler.connected(mesos);
+ jmethodID connected =
+ env->GetMethodID(clazz, "connected",
+ "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+ env->ExceptionClear();
+
+ env->CallVoidMethod(jscheduler, connected, jmesos);
+
+ if (env->ExceptionCheck()) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ jvm->DetachCurrentThread();
+ ABORT("Exception thrown during `connected` call");
+ }
+
+ jvm->DetachCurrentThread();
+}
+
+
+void JNIMesos::disconnected()
+{
+ jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+ jclass clazz = env->GetObjectClass(jmesos);
+
+ jfieldID scheduler =
+ env->GetFieldID(clazz, "scheduler",
+ "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+ jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+ clazz = env->GetObjectClass(jscheduler);
+
+ // scheduler.disconnected(mesos);
+ jmethodID disconnected =
+ env->GetMethodID(clazz, "disconnected",
+ "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+ env->ExceptionClear();
+
+ env->CallVoidMethod(jscheduler, disconnected, jmesos);
+
+ if (env->ExceptionCheck()) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ jvm->DetachCurrentThread();
+ ABORT("Exception thrown during `disconnected` call");
+ }
+
+ jvm->DetachCurrentThread();
+}
+
+
+void JNIMesos::received(const Event& event)
+{
+ jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+ jclass clazz = env->GetObjectClass(jmesos);
+
+ jfieldID scheduler =
+ env->GetFieldID(clazz, "scheduler",
+ "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+ jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+ clazz = env->GetObjectClass(jscheduler);
+
+ // scheduler.received(mesos, event);
+ jmethodID received =
+ env->GetMethodID(clazz, "received",
+ "(Lorg/apache/mesos/v1/scheduler/Mesos;"
+ "Lorg/apache/mesos/v1/scheduler/Protos$Event;)V");
+
+ jobject jevent = convert<Event>(env, event);
+
+ env->ExceptionClear();
+
+ env->CallVoidMethod(jscheduler, received, jmesos, jevent);
+
+ if (env->ExceptionCheck()) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ jvm->DetachCurrentThread();
+ ABORT("Exception thrown during `received` call");
+ }
+
+ jvm->DetachCurrentThread();
+}
+
+} // namespace v1 {
+
+
+extern "C" {
+
+/*
+ * Class: org_apache_mesos_v1_JNIMesos
+ * Method: initialize
+ * Signature: ()V
+ *
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_JNIMesos_initialize
+ (JNIEnv* env, jobject thiz)
+{
+ jclass clazz = env->GetObjectClass(thiz);
+
+ // Create a weak global reference to the Scheduler
+ // instance (we want a global reference so the GC doesn't collect
+ // the instance but we make it weak so the JVM can exit).
+ jweak jmesos = env->NewWeakGlobalRef(thiz);
+
+ // Get out the master passed into the constructor.
+ jfieldID master = env->GetFieldID(clazz, "master", "Ljava/lang/String;");
+ jobject jmaster = env->GetObjectField(thiz, master);
+
+ // Get out the credential passed into the constructor.
+ jfieldID credential =
+ env->GetFieldID(clazz, "credential",
+ "Lorg/apache/mesos/v1/Protos$Credential;");
+
+ jobject jcredential = env->GetObjectField(thiz, credential);
+
+ Option<Credential> credential_;
+ if (!env->IsSameObject(jcredential, nullptr)) {
+ credential_ = construct<Credential>(env, jcredential);
+ }
+
+ // Create the C++ scheduler and initialize `__mesos`.
+ v1::JNIMesos* mesos =
+ new v1::JNIMesos(env, jmesos, construct<string>(env, jmaster), credential_);
+
+ jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+ env->SetLongField(thiz, __mesos, (jlong) mesos);
+}
+
+
+/*
+ * Class: org_apache_mesos_v1_JNIMesos
+ * Method: finalize
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_JNIMesos_finalize
+ (JNIEnv* env, jobject thiz)
+{
+ jclass clazz = env->GetObjectClass(thiz);
+
+ jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+ v1::JNIMesos* mesos =
+ (v1::JNIMesos*) env->GetLongField(thiz, __mesos);
+
+ env->DeleteWeakGlobalRef(mesos->jmesos);
+
+ delete mesos;
+}
+
+
+/*
+ * Class: org_apache_mesos_v1_JNIMesos
+ * Method: send
+ * Signature: (Lorg/apache/mesos/v1/scheduler/Protos/Call;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_JNIMesos_send
+ (JNIEnv* env, jobject thiz, jobject jcall)
+{
+ // Construct a C++ Call from the Java Call.
+ const Call& call = construct<Call>(env, jcall);
+
+ jclass clazz = env->GetObjectClass(thiz);
+
+ jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+ v1::JNIMesos* mesos =
+ (v1::JNIMesos*) env->GetLongField(thiz, __mesos);
+
+ // It is possible that `mesos` might not be initialized in some cases due to
+ // a possible race condition. See MESOS-5926 for more details.
+ if (mesos->mesos.get() == nullptr) {
+ LOG(WARNING) << "Ignoring call " << call.type() << " as the library has "
+ << "not been initialized yet";
+ return;
+ }
+
+ mesos->mesos->send(call);
+}
+
+
+/*
+ * Class: org_apache_mesos_v1_scheduler_JNIMesos
+ * Method: reconnect
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_JNIMesos_reconnect
+ (JNIEnv* env, jobject thiz)
+{
+ jclass clazz = env->GetObjectClass(thiz);
+
+ jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+ v1::JNIMesos* mesos =
+ (v1::JNIMesos*) env->GetLongField(thiz, __mesos);
+
+ // It is possible that `mesos` might not be initialized in some cases due to
+ // a possible race condition. See MESOS-5926 for more details.
+ if (mesos->mesos.get() == nullptr) {
+ LOG(WARNING) << "Ignoring the reconnect request as the library has not "
+ << "been initialized yet";
+ return;
+ }
+
+ mesos->mesos->reconnect();
+}
+
+} // extern "C" {
[4/7] mesos git commit: Added a abstract base class for scheduler
library.
Posted by an...@apache.org.
Added a abstract base class for scheduler library.
This change adds an abstract base class `MesosBase` that
would be used by implementations for interacting with
Mesos via the Scheduler API. This is needed later for
implementing the V0->V1 Scheduler Adapter.
Review: https://reviews.apache.org/r/50247
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e184216b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e184216b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e184216b
Branch: refs/heads/master
Commit: e184216b299424fc6b7b241fd9caeb7e2f209306
Parents: 9567fb4
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 17:28:27 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700
----------------------------------------------------------------------
include/mesos/v1/scheduler.hpp | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e184216b/include/mesos/v1/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler.hpp b/include/mesos/v1/scheduler.hpp
index 18e7a95..d56e088 100644
--- a/include/mesos/v1/scheduler.hpp
+++ b/include/mesos/v1/scheduler.hpp
@@ -41,8 +41,19 @@ namespace scheduler {
class MesosProcess; // Forward declaration.
-// Interface to Mesos for a scheduler. Abstracts master detection
-// (connection and disconnection).
+// Abstract interface for connecting a scheduler to Mesos.
+class MesosBase
+{
+public:
+ // Empty virtual destructor (necessary to instantiate subclasses).
+ virtual ~MesosBase() {}
+ virtual void send(const Call& call) = 0;
+ virtual void reconnect() = 0;
+};
+
+
+// Concrete implementation that connects a scheduler to a Mesos master.
+// Abstracts master detection (connection and disconnection).
//
// Expects three callbacks, 'connected', 'disconnected', and
// 'received' which will get invoked _serially_ when it's determined
@@ -51,7 +62,7 @@ class MesosProcess; // Forward declaration.
// The library reconnects with the master upon a disconnection.
//
// NOTE: All calls and events are dropped while disconnected.
-class Mesos
+class Mesos : public MesosBase
{
public:
// The credential will be used for authenticating with the master. Currently,
@@ -81,7 +92,7 @@ public:
// events without ever being sent to the master. This includes when
// calls are sent but no master is currently detected (i.e., we're
// disconnected).
- virtual void send(const Call& call);
+ virtual void send(const Call& call) override;
// Force a reconnection with the master.
//
@@ -94,7 +105,7 @@ public:
// This call would be ignored if the scheduler is already disconnected with
// the master (e.g., no new master has been elected). Otherwise, the scheduler
// would get a 'disconnected' callback followed by a 'connected' callback.
- virtual void reconnect();
+ virtual void reconnect() override;
protected:
// NOTE: This constructor is used for testing.
[3/7] mesos git commit: Added helper functions for v1 JNI
`construct()`/`convert()`.
Posted by an...@apache.org.
Added helper functions for v1 JNI `construct()`/`convert()`.
These would be used later in the chain for the making JNI
calls to v1 Mesos implementation in native code.
Review: https://reviews.apache.org/r/50248
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/07a1802f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/07a1802f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/07a1802f
Branch: refs/heads/master
Commit: 07a1802f1a6c1b7b3b778bb86d17254842322e5c
Parents: e184216
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 19:07:41 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700
----------------------------------------------------------------------
src/java/jni/construct.cpp | 67 +++++++++++++++++++++++++++++++++++++++++
src/java/jni/convert.cpp | 26 ++++++++++++++++
2 files changed, 93 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/07a1802f/src/java/jni/construct.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/construct.cpp b/src/java/jni/construct.cpp
index e5c070c..2645d9e 100644
--- a/src/java/jni/construct.cpp
+++ b/src/java/jni/construct.cpp
@@ -25,6 +25,10 @@
#include <mesos/mesos.hpp>
+#include <mesos/v1/mesos.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
#include "construct.hpp"
using namespace mesos;
@@ -399,3 +403,66 @@ Offer::Operation construct(JNIEnv* env, jobject jobj)
return operation;
}
+
+
+template <>
+v1::Credential construct(JNIEnv* env, jobject jobj)
+{
+ jclass clazz = env->GetObjectClass(jobj);
+
+ // byte[] data = obj.toByteArray();
+ jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B");
+
+ jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray);
+
+ jbyte* data = env->GetByteArrayElements(jdata, nullptr);
+ jsize length = env->GetArrayLength(jdata);
+
+ const v1::Credential& credential = parse<v1::Credential>(data, length);
+
+ env->ReleaseByteArrayElements(jdata, data, 0);
+
+ return credential;
+}
+
+
+template <>
+v1::FrameworkInfo construct(JNIEnv* env, jobject jobj)
+{
+ jclass clazz = env->GetObjectClass(jobj);
+
+ // byte[] data = obj.toByteArray();
+ jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B");
+
+ jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray);
+
+ jbyte* data = env->GetByteArrayElements(jdata, nullptr);
+ jsize length = env->GetArrayLength(jdata);
+
+ const v1::FrameworkInfo& framework = parse<v1::FrameworkInfo>(data, length);
+
+ env->ReleaseByteArrayElements(jdata, data, 0);
+
+ return framework;
+}
+
+
+template<>
+v1::scheduler::Call construct(JNIEnv* env, jobject jobj)
+{
+ jclass clazz = env->GetObjectClass(jobj);
+
+ // byte[] data = obj.toByteArray();
+ jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B");
+
+ jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray);
+
+ jbyte* data = env->GetByteArrayElements(jdata, nullptr);
+ jsize length = env->GetArrayLength(jdata);
+
+ const v1::scheduler::Call& call = parse<v1::scheduler::Call>(data, length);
+
+ env->ReleaseByteArrayElements(jdata, data, 0);
+
+ return call;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/07a1802f/src/java/jni/convert.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/convert.cpp b/src/java/jni/convert.cpp
index 45ff488..338eb96 100644
--- a/src/java/jni/convert.cpp
+++ b/src/java/jni/convert.cpp
@@ -21,6 +21,8 @@
#include <mesos/mesos.hpp>
+#include <mesos/v1/scheduler/scheduler.hpp>
+
#include <stout/result.hpp>
#include <stout/strings.hpp>
@@ -488,6 +490,30 @@ jobject convert(JNIEnv* env, const Status& status)
}
+template <>
+jobject convert(JNIEnv* env, const v1::scheduler::Event& event)
+{
+ string data;
+ event.SerializeToString(&data);
+
+ // byte[] data = ..;
+ jbyteArray jdata = env->NewByteArray(data.size());
+ env->SetByteArrayRegion(jdata, 0, data.size(), (jbyte*) data.data());
+
+ // Event event = Event.parseFrom(data);
+ jclass clazz =
+ FindMesosClass(env, "org/apache/mesos/v1/scheduler/Protos$Event");
+
+ jmethodID parseFrom =
+ env->GetStaticMethodID(clazz, "parseFrom",
+ "([B)Lorg/apache/mesos/v1/scheduler/Protos$Event;");
+
+ jobject jevent = env->CallStaticObjectMethod(clazz, parseFrom, jdata);
+
+ return jevent;
+}
+
+
// Helper to safely return the 'jfieldID' of the given 'name'
// and 'signature'. If the field doesn't exist 'None' is
// returned. If any other JVM Exception is encountered an 'Error'
[7/7] mesos git commit: Added java implementations for the V0/V1
implementation for Mesos.
Posted by an...@apache.org.
Added java implementations for the V0/V1 implementation for Mesos.
Review: https://reviews.apache.org/r/50251
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58555326
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58555326
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58555326
Branch: refs/heads/master
Commit: 5855532668ab2a5f23914b26ccee70d7a5068302
Parents: c1ba4a5
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 19:56:50 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 3 +-
.../org/apache/mesos/v1/scheduler/JNIMesos.java | 84 ++++++++++++++++++++
.../org/apache/mesos/v1/scheduler/V0Mesos.java | 84 ++++++++++++++++++++
3 files changed, 170 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/58555326/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index fa4dea2..3a743e4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1518,7 +1518,8 @@ MESOS_JAR_SOURCE = \
$(srcdir)/java/src/org/apache/mesos/state/ZooKeeperState.java \
$(srcdir)/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java \
$(srcdir)/java/src/org/apache/mesos/v1/scheduler/Mesos.java \
- $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
+ $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Scheduler.java \
+ $(srcdir)/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java
MESOS_JAR_GENERATED = $(JAVA_PROTOS) $(V1_JAVA_PROTOS) \
java/generated/org/apache/mesos/MesosNativeLibrary.java
EXTRA_DIST += $(MESOS_JAR_SOURCE) \
http://git-wip-us.apache.org/repos/asf/mesos/blob/58555326/src/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java b/src/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java
new file mode 100644
index 0000000..fc804a4
--- /dev/null
+++ b/src/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.v1.scheduler;
+
+import org.apache.mesos.MesosNativeLibrary;
+
+import org.apache.mesos.v1.Protos.Credential;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+
+/**
+ * Concrete implementation of the Mesos interface that connects a Scheduler
+ * with a Mesos master. This class is thread-safe.
+ * <p>
+ * This implementation uses the scheduler library (src/scheduler/scheduler.cpp)
+ * based on the V1 Mesos Scheduler API. The library is responsible for
+ * invoking the Scheduler callbacks as it communicates with the Mesos master.
+ * <p>
+ * <p>
+ * Note that the scheduler library uses GLOG to do its own logging. GLOG flags
+ * can be set via environment variables, prefixing the flag name with
+ * "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
+ * src/logging/flags.hpp. Mesos flags can also be set via environment
+ * variables, prefixing the flag name with "MESOS_", e.g., "MESOS_QUIET=1".
+ * <p>
+ * See src/examples/java/V1TestFramework.java for an example of using this.
+ */
+public class JNIMesos implements Mesos {
+ static {
+ MesosNativeLibrary.load();
+ }
+
+ public JNIMesos(Scheduler scheduler, String master) {
+ this(scheduler, master, null);
+ }
+
+ public JNIMesos(Scheduler scheduler, String master, Credential credential) {
+ if (scheduler == null) {
+ throw new NullPointerException("Not expecting a null scheduler");
+ }
+
+ if (master == null) {
+ throw new NullPointerException("Not expecting a null master");
+ }
+
+ this.scheduler = scheduler;
+ this.master = master;
+ this.credential = credential;
+
+ initialize();
+ }
+
+ @Override
+ public native void send(Call call);
+
+ @Override
+ public native void reconnect();
+
+ protected native void initialize();
+ protected native void finalize();
+
+ private final Scheduler scheduler;
+ private final String master;
+ private final Credential credential;
+
+ private long __mesos;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/58555326/src/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java b/src/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java
new file mode 100644
index 0000000..fc262e1
--- /dev/null
+++ b/src/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.v1.scheduler;
+
+import org.apache.mesos.MesosNativeLibrary;
+
+import org.apache.mesos.v1.Protos.Credential;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+
+/**
+ * This implementation acts as an adapter from the v0 (driver + scheduler)
+ * to the v1 Scheduler interface. It uses the MesosSchedulerDriver under
+ * the hood for interacting with Mesos. This class is thread-safe.
+ */
+public class V0Mesos implements Mesos {
+ static {
+ MesosNativeLibrary.load();
+ }
+
+ public V0Mesos(Scheduler scheduler, FrameworkInfo framework, String master) {
+ this(scheduler, framework, master, null);
+ }
+
+ public V0Mesos(Scheduler scheduler,
+ FrameworkInfo framework,
+ String master,
+ Credential credential) {
+ if (scheduler == null) {
+ throw new NullPointerException("Not expecting a null scheduler");
+ }
+
+ if (framework == null) {
+ throw new NullPointerException("Not expecting a null framework");
+ }
+
+ if (master == null) {
+ throw new NullPointerException("Not expecting a null master");
+ }
+
+ this.scheduler = scheduler;
+ this.framework = framework;
+ this.master = master;
+ this.credential = credential;
+
+ initialize();
+ }
+
+ @Override
+ public native void send(Call call);
+
+ // This is currently a no-op for the driver as it does not expose semantics
+ // to force reconnection.
+ @Override
+ public void reconnect() {}
+
+ protected native void initialize();
+ protected native void finalize();
+
+ private final Scheduler scheduler;
+ private final FrameworkInfo framework;
+ private final String master;
+ private final Credential credential;
+
+ private long __mesos;
+}