You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/08/02 15:42:43 UTC

[1/7] mesos git commit: Added an example test for the V0/V1 Mesos java implementation.

Repository: mesos
Updated Branches:
  refs/heads/master 9567fb420 -> 74ceb051c


Added an example test for the V0/V1 Mesos java implementation.

Review: https://reviews.apache.org/r/50254


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/74ceb051
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/74ceb051
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/74ceb051

Branch: refs/heads/master
Commit: 74ceb051c454326ace02a7c670c0ea4d911519c2
Parents: 68c27d1
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 23:36:34 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 configure.ac                           |   2 +
 src/Makefile.am                        |   8 +-
 src/examples/java/V1TestFramework.java | 386 ++++++++++++++++++++++++++++
 src/examples/java/v1-test-framework.in |  39 +++
 src/tests/examples_tests.cpp           |   4 +
 src/tests/java_v0_framework_test.sh    |  41 +++
 src/tests/java_v1_framework_test.sh    |  40 +++
 7 files changed, 518 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index d213690..f82c631 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1069,6 +1069,8 @@ __EOF__
                   [chmod +x src/examples/java/test-multiple-executors-framework])
   AC_CONFIG_FILES([src/examples/java/test-log],
                   [chmod +x src/examples/java/test-log])
+  AC_CONFIG_FILES([src/examples/java/v1-test-framework],
+                  [chmod +x src/examples/java/v1-test-framework])
   AC_CONFIG_FILES([src/java/mesos.pom])
 
   AC_DEFINE([MESOS_HAS_JAVA])

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index b9cc040..30d8f72 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1532,7 +1532,8 @@ EXAMPLES_SOURCE =							\
   $(srcdir)/examples/java/TestExecutor.java				\
   $(srcdir)/examples/java/TestFramework.java				\
   $(srcdir)/examples/java/TestLog.java					\
-  $(srcdir)/examples/java/TestMultipleExecutorsFramework.java
+  $(srcdir)/examples/java/TestMultipleExecutorsFramework.java		\
+  $(srcdir)/examples/java/V1TestFramework.java
 EXTRA_DIST += $(EXAMPLES_SOURCE)
 
 
@@ -2236,7 +2237,8 @@ EXAMPLESCRIPTSJAVA =						\
   examples/java/test-exception-framework			\
   examples/java/test-framework					\
   examples/java/test-log					\
-  examples/java/test-multiple-executors-framework
+  examples/java/test-multiple-executors-framework		\
+  examples/java/v1-test-framework
 
 check_SCRIPTS += $(EXAMPLESCRIPTSJAVA)
 mesos_tests_DEPENDENCIES += $(EXAMPLESCRIPTSJAVA)
@@ -2270,6 +2272,8 @@ dist_check_SCRIPTS +=						\
   tests/java_exception_test.sh					\
   tests/java_framework_test.sh					\
   tests/java_log_test.sh					\
+  tests/java_v0_framework_test.sh				\
+  tests/java_v1_framework_test.sh				\
   tests/no_executor_framework_test.sh				\
   tests/persistent_volume_framework_test.sh			\
   tests/python_framework_test.sh				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/examples/java/V1TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/V1TestFramework.java b/src/examples/java/V1TestFramework.java
new file mode 100644
index 0000000..a41edbc
--- /dev/null
+++ b/src/examples/java/V1TestFramework.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import java.util.concurrent.locks.*;
+
+import org.apache.mesos.v1.*;
+
+import org.apache.mesos.v1.Protos.*;
+
+import org.apache.mesos.v1.scheduler.JNIMesos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.apache.mesos.v1.scheduler.V0Mesos;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+
+public class V1TestFramework {
+  static class TestScheduler implements Scheduler {
+
+    public TestScheduler(
+        String master,
+        FrameworkInfo framework,
+        ExecutorInfo executor) {
+      this(master, framework, executor, 5);
+    }
+
+    public TestScheduler(
+        String master,
+        FrameworkInfo framework,
+        ExecutorInfo executor,
+        int totalTasks) {
+      this.framework = framework;
+      this.executor = executor;
+      this.totalTasks = totalTasks;
+      this.state = State.DISCONNECTED;
+    }
+
+    // TODO(anand): Synchronize on `state` instead.
+    @Override
+    public synchronized void connected(final Mesos mesos) {
+      System.out.println("Connected");
+
+      state = State.CONNECTED;
+
+      retryTimer = new Timer();
+      retryTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          doReliableRegistration(mesos);
+        }
+      }, 0, 1000); // Repeat every 1 second
+    }
+
+    @Override
+    public synchronized void disconnected(Mesos mesos) {
+      System.out.println("Disconnected");
+
+      state = state.DISCONNECTED;
+      cancelRetryTimer();
+    }
+
+    @Override
+    public synchronized void received(Mesos mesos, Event event) {
+      switch (event.getType()) {
+        case SUBSCRIBED: {
+          frameworkId = event.getSubscribed().getFrameworkId();
+          state = State.SUBSCRIBED;
+
+          System.out.println("Subscribed with ID " + frameworkId);
+          break;
+        }
+
+        case OFFERS: {
+          System.out.println("Received an OFFERS event");
+
+          offers(mesos, event.getOffers().getOffersList());
+          break;
+        }
+
+        case RESCIND: {
+          System.out.println("Received an RESCIND event");
+          break;
+        }
+
+        case UPDATE: {
+          System.out.println("Received an UPDATE event");
+
+          update(mesos, event.getUpdate().getStatus());
+          break;
+        }
+
+        case MESSAGE: {
+          System.out.println("Received a MESSAGE event");
+          break;
+        }
+
+        case FAILURE: {
+          System.out.println("Received a FAILURE event");
+          break;
+        }
+
+        case ERROR: {
+          System.out.println("Received an ERROR event");
+          System.exit(1);
+        }
+
+        case HEARTBEAT: {
+          // TODO(anand): Force reconnection with the master upon lack
+          // of heartbeats.
+          System.out.println("Received a HEARTBEAT event");
+          break;
+        }
+
+        case UNKNOWN: {
+          System.out.println("Received an UNKNOWN event");
+          break;
+        }
+      }
+    }
+
+    public synchronized void doReliableRegistration(Mesos mesos) {
+      if (state == State.SUBSCRIBED || state == State.DISCONNECTED) {
+        cancelRetryTimer();
+        return;
+      }
+
+      Call.Builder callBuilder = Call.newBuilder()
+          .setType(Call.Type.SUBSCRIBE)
+          .setSubscribe(Call.Subscribe.newBuilder()
+            .setFrameworkInfo(framework)
+            .build());
+
+      mesos.send(callBuilder.build());
+    }
+
+    private void cancelRetryTimer() {
+      // Cancel previously active timer (if one exists).
+      if (retryTimer != null) {
+        retryTimer.cancel();
+        retryTimer.purge();
+      }
+
+      retryTimer = null;
+    }
+
+    public void offers(Mesos mesos, List<Offer> offers) {
+      double CPUS_PER_TASK = 1;
+      double MEM_PER_TASK = 128;
+
+      for (Offer offer : offers) {
+        Offer.Operation.Launch.Builder launch =
+          Offer.Operation.Launch.newBuilder();
+
+        double offerCpus = 0;
+        double offerMem = 0;
+        for (Resource resource : offer.getResourcesList()) {
+          if (resource.getName().equals("cpus")) {
+            offerCpus += resource.getScalar().getValue();
+          } else if (resource.getName().equals("mem")) {
+            offerMem += resource.getScalar().getValue();
+          }
+        }
+
+        System.out.println(
+            "Received offer " + offer.getId().getValue() + " with cpus: " +
+            offerCpus + " and mem: " + offerMem);
+
+        double remainingCpus = offerCpus;
+        double remainingMem = offerMem;
+        while (launchedTasks < totalTasks &&
+               remainingCpus >= CPUS_PER_TASK &&
+               remainingMem >= MEM_PER_TASK) {
+          TaskID taskId = TaskID.newBuilder()
+            .setValue(Integer.toString(launchedTasks++)).build();
+
+          System.out.println("Launching task " + taskId.getValue() +
+                             " using offer " + offer.getId().getValue());
+
+          TaskInfo task = TaskInfo.newBuilder()
+            .setName("task " + taskId.getValue())
+            .setTaskId(taskId)
+            .setAgentId(offer.getAgentId())
+            .addResources(Resource.newBuilder()
+                          .setName("cpus")
+                          .setType(Value.Type.SCALAR)
+                          .setScalar(Value.Scalar.newBuilder()
+                            .setValue(CPUS_PER_TASK)
+                            .build()))
+            .addResources(Resource.newBuilder()
+                          .setName("mem")
+                          .setType(Value.Type.SCALAR)
+                          .setScalar(Value.Scalar.newBuilder()
+                            .setValue(MEM_PER_TASK)
+                            .build()))
+            .setExecutor(ExecutorInfo.newBuilder(executor))
+            .build();
+
+          launch.addTaskInfos(TaskInfo.newBuilder(task));
+
+          remainingCpus -= CPUS_PER_TASK;
+          remainingMem -= MEM_PER_TASK;
+        }
+
+        mesos.send(Call.newBuilder()
+          .setType(Call.Type.ACCEPT)
+          .setFrameworkId(frameworkId)
+          .setAccept(Call.Accept.newBuilder()
+            .addOfferIds(offer.getId())
+            .addOperations(Offer.Operation.newBuilder()
+              .setType(Offer.Operation.Type.LAUNCH)
+              .setLaunch(launch)
+              .build())
+            .setFilters(Filters.newBuilder()
+              .setRefuseSeconds(1)
+              .build()))
+          .build());
+      }
+    }
+
+    public void update(Mesos mesos, TaskStatus status) {
+      System.out.println(
+          "Status update: task " + status.getTaskId().getValue() +
+          " is in state " + status.getState().getValueDescriptor().getName());
+
+      if (status.getState() == TaskState.TASK_FINISHED) {
+        finishedTasks++;
+        System.out.println("Finished tasks: " + finishedTasks);
+        if (finishedTasks == totalTasks) {
+          lock.lock();
+          try {
+            finished = true;
+            finishedCondtion.signal();
+          } finally {
+            lock.unlock();
+          }
+        }
+      }
+
+      if (status.getState() == TaskState.TASK_LOST ||
+          status.getState() == TaskState.TASK_KILLED ||
+          status.getState() == TaskState.TASK_FAILED) {
+        System.err.println(
+            "Aborting because task " + status.getTaskId().getValue() +
+            " is in unexpected state " +
+            status.getState().getValueDescriptor().getName() +
+            " with reason '" +
+            status.getReason().getValueDescriptor().getName() + "'" +
+            " from source '" +
+            status.getSource().getValueDescriptor().getName() + "'" +
+            " with message '" + status.getMessage() + "'");
+
+        System.exit(1);
+      }
+
+      mesos.send(Call.newBuilder()
+        .setType(Call.Type.ACKNOWLEDGE)
+        .setFrameworkId(frameworkId)
+        .setAcknowledge(Call.Acknowledge.newBuilder()
+          .setAgentId(status.getAgentId())
+          .setTaskId(status.getTaskId())
+          .setUuid(status.getUuid())
+          .build())
+        .build());
+    }
+
+    private enum State {
+      DISCONNECTED,
+      CONNECTED,
+      SUBSCRIBED
+    }
+
+    private FrameworkInfo framework;
+    private FrameworkID frameworkId;
+    private final ExecutorInfo executor;
+    private final int totalTasks;
+    private int launchedTasks = 0;
+    private int finishedTasks = 0;
+    private State state;
+    private Timer retryTimer = null;
+  }
+
+  private static void usage() {
+    String name = V1TestFramework.class.getName();
+    System.err.println("Usage: " + name + " master version{0,1}");
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2 || args.length > 3) {
+      usage();
+      System.exit(1);
+    }
+
+    int version = Integer.parseInt(args[1]);
+    if (version != 0 && version != 1) {
+      usage();
+      System.exit(1);
+    }
+
+    String uri = new File("./test-executor").getCanonicalPath();
+
+    ExecutorInfo executor = ExecutorInfo.newBuilder()
+      .setExecutorId(ExecutorID.newBuilder().setValue("default"))
+      .setCommand(CommandInfo.newBuilder().setValue(uri))
+      .setName("Test Executor (Java)")
+      .setSource("java_test")
+      .build();
+
+    FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder()
+        .setUser(System.getProperty("user.name", "default-user"))
+        .setName("V" + version + " Test Framework (Java)");
+
+    Credential.Builder credentialBuilder = null;
+
+    if (System.getenv("DEFAULT_PRINCIPAL") != null) {
+      frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL"));
+
+      if (System.getenv("DEFAULT_SECRET") != null) {
+        credentialBuilder = Credential.newBuilder()
+          .setPrincipal(System.getenv("DEFAULT_PRINCIPAL"))
+          .setSecret(System.getenv("DEFAULT_SECRET"));
+      }
+    }
+
+    Scheduler scheduler = args.length == 2
+      ? new TestScheduler(args[0], frameworkBuilder.build(), executor)
+      : new TestScheduler(args[0],
+                          frameworkBuilder.build(),
+                          executor,
+                          Integer.parseInt(args[2]));
+
+    Mesos mesos;
+    if (credentialBuilder != null) {
+      mesos = version == 1
+                ? new JNIMesos(scheduler, args[0], credentialBuilder.build())
+                : new V0Mesos(scheduler,
+                              frameworkBuilder.build(),
+                              args[0],
+                              credentialBuilder.build());
+    } else {
+      mesos = version == 1
+                ? new JNIMesos(scheduler, args[0])
+                : new V0Mesos(scheduler, frameworkBuilder.build(), args[0]);
+    }
+
+    lock.lock();
+    try {
+      while (!finished) {
+        finishedCondtion.await();
+      }
+    } finally {
+      lock.unlock();
+    }
+
+    System.exit(0);
+  }
+
+  static boolean finished = false;
+  final static Lock lock = new ReentrantLock();
+  final static Condition finishedCondtion = lock.newCondition();
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/examples/java/v1-test-framework.in
----------------------------------------------------------------------
diff --git a/src/examples/java/v1-test-framework.in b/src/examples/java/v1-test-framework.in
new file mode 100644
index 0000000..6e5e581
--- /dev/null
+++ b/src/examples/java/v1-test-framework.in
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+
+# This script uses MESOS_SOURCE_DIR and MESOS_BUILD_DIR which come
+# from configuration substitutions.
+MESOS_SOURCE_DIR=@abs_top_srcdir@
+MESOS_BUILD_DIR=@abs_top_builddir@
+
+# Locate Java from environment or use configure discovered location.
+JAVA_HOME=${JAVA_HOME-@JAVA_HOME@}
+JAVA=${JAVA-${JAVA_HOME}/bin/java}
+
+# Use colors for errors.
+. ${MESOS_SOURCE_DIR}/support/colors.sh
+
+PROTOBUF_JAR=@PROTOBUF_JAR@
+
+test ! -e ${PROTOBUF_JAR} && \
+  echo "${RED}Failed to find ${PROTOBUF_JAR}${NORMAL}" && \
+  exit 1
+
+MESOS_JAR=${MESOS_BUILD_DIR}/src/java/target/mesos-@PACKAGE_VERSION@.jar
+
+test ! -e ${MESOS_JAR} && \
+  echo "${RED}Failed to find ${MESOS_JAR}${NORMAL}" && \
+  exit 1
+
+EXAMPLES_JAR=${MESOS_BUILD_DIR}/src/examples.jar
+
+test ! -e ${EXAMPLES_JAR} && \
+  echo "${RED}Failed to find ${EXAMPLES_JAR}${NORMAL}" && \
+  exit 1
+
+# Need to run in the directory containing this script so that the
+# framework is able to find the executor.
+cd `dirname ${0}`
+
+exec ${JAVA} -cp ${PROTOBUF_JAR}:${MESOS_JAR}:${EXAMPLES_JAR} \
+  -Djava.library.path=${MESOS_BUILD_DIR}/src/.libs \
+  V1TestFramework "${@}"

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/examples_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp
index cac5304..52fac33 100644
--- a/src/tests/examples_tests.cpp
+++ b/src/tests/examples_tests.cpp
@@ -38,6 +38,10 @@ TEST_SCRIPT(ExamplesTest, DiskFullFramework,
 TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh")
 TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh")
 TEST_SCRIPT(ExamplesTest, JavaLog, "java_log_test.sh")
+
+// TODO(anand): Parameterize these tests on version.
+TEST_SCRIPT(ExamplesTest, V0JavaFramework, "java_v0_framework_test.sh")
+TEST_SCRIPT(ExamplesTest, V1JavaFramework, "java_v1_framework_test.sh")
 #endif
 
 #ifdef MESOS_HAS_PYTHON

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/java_v0_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/java_v0_framework_test.sh b/src/tests/java_v0_framework_test.sh
new file mode 100755
index 0000000..12daae4
--- /dev/null
+++ b/src/tests/java_v0_framework_test.sh
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+  exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_BUILD_DIR in environment" && \
+  exit 1
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Lower the authentication timeout to speed up the test (the master
+# may drop the authentication message while it is recovering).
+export MESOS_AUTHENTICATION_TIMEOUT=200ms
+
+# Set local Mesos runner to use 3 slaves.
+export MESOS_NUM_SLAVES=3
+
+# Set resources for the slave.
+export MESOS_RESOURCES="cpus:2;mem:10240"
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the Java test framework executes without crashing (returns 0).
+exec $MESOS_BUILD_DIR/src/examples/java/v1-test-framework local 0

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/java_v1_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/java_v1_framework_test.sh b/src/tests/java_v1_framework_test.sh
new file mode 100755
index 0000000..c49e11c
--- /dev/null
+++ b/src/tests/java_v1_framework_test.sh
@@ -0,0 +1,40 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+  exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_BUILD_DIR in environment" && \
+  exit 1
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Set the connection delay to 0 to speed up the tests.
+export MESOS_CONNECTION_DELAY_MAX=0ms;
+
+# Set local Mesos runner to use 3 slaves.
+export MESOS_NUM_SLAVES=3
+
+# Set resources for the slave.
+export MESOS_RESOURCES="cpus:2;mem:10240"
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the Java test framework executes without crashing (returns 0).
+exec $MESOS_BUILD_DIR/src/examples/java/v1-test-framework local 1


[6/7] mesos git commit: Added native implementation for the V0 Mesos Adapter.

Posted by an...@apache.org.
Added native implementation for the V0 Mesos Adapter.

This change adds the C++ implementation for the JAVA V0 to V1 Mesos
implementation adapter (driver + scheduler).

Review: https://reviews.apache.org/r/50253


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/68c27d10
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/68c27d10
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/68c27d10

Branch: refs/heads/master
Commit: 68c27d1095dd33e569a96a05337dbd961cfdd6fe
Parents: bca68f6
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 23:23:05 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   9 +-
 .../org_apache_mesos_v1_scheduler_V0Mesos.cpp   | 893 +++++++++++++++++++
 2 files changed, 901 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/68c27d10/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 2b02b5f..b9cc040 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1563,6 +1563,7 @@ libjava_la_SOURCES =							\
   java/jni/org_apache_mesos_state_Variable.cpp				\
   java/jni/org_apache_mesos_state_ZooKeeperState.cpp			\
   java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp			\
+  java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp			\
   jvm/jvm.cpp								\
   jvm/jvm.hpp								\
   jvm/java/io.hpp							\
@@ -1605,7 +1606,8 @@ nodist_libjava_la_SOURCES =						\
   java/jni/org_apache_mesos_state_LogState.h				\
   java/jni/org_apache_mesos_state_Variable.h				\
   java/jni/org_apache_mesos_state_ZooKeeperState.h			\
-  java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h
+  java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h			\
+  java/jni/org_apache_mesos_v1_scheduler_V0Mesos.h
 
 BUILT_SOURCES += $(nodist_libjava_la_SOURCES)
 
@@ -1654,6 +1656,11 @@ java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h: $(MESOS_JAR)
 	-classpath $(MESOS_JAR):@PROTOBUF_JAR@				\
 	  org.apache.mesos.v1.scheduler.JNIMesos
 
+java/jni/org_apache_mesos_v1_scheduler_V0Mesos.h: $(MESOS_JAR)
+	$(JAVA_HOME)/bin/javah -d java/jni				\
+	-classpath $(MESOS_JAR):@PROTOBUF_JAR@				\
+	  org.apache.mesos.v1.scheduler.V0Mesos
+
 $(EXAMPLES_JAR): $(EXAMPLES_SOURCE)
 	@echo "Building examples.jar ..."
 	$(MKDIR_P) examples/java

http://git-wip-us.apache.org/repos/asf/mesos/blob/68c27d10/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
new file mode 100644
index 0000000..7febe95
--- /dev/null
+++ b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
@@ -0,0 +1,893 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <queue>
+#include <string>
+#include <vector>
+
+#include <mesos/mesos.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/exit.hpp>
+#include <stout/unreachable.hpp>
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "jvm/jvm.hpp"
+
+#include "master/constants.hpp"
+#include "master/validation.hpp"
+
+#include "convert.hpp"
+#include "construct.hpp"
+#include "org_apache_mesos_v1_scheduler_V0Mesos.h"
+
+using namespace mesos;
+using namespace mesos::internal::master;
+
+using std::queue;
+using std::string;
+using std::vector;
+
+using mesos::internal::devolve;
+using mesos::internal::evolve;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+
+using process::Clock;
+using process::Owned;
+using process::Timer;
+
+class V0ToV1AdapterProcess; // Forward declaration.
+
+// This interface acts as an adapter from the v0 (driver + scheduler) to the
+// v1 Mesos scheduler.
+class V0ToV1Adapter : public mesos::Scheduler, public v1::scheduler::MesosBase
+{
+public:
+  V0ToV1Adapter(
+      JNIEnv* env,
+      jweak jmesos,
+      const FrameworkInfo& frameworkInfo,
+      const string& master,
+      const Option<Credential>& credential);
+
+  virtual ~V0ToV1Adapter();
+
+  // v0 Scheduler interface overrides.
+  virtual void registered(
+      SchedulerDriver* driver,
+      const FrameworkID& frameworkId,
+      const MasterInfo& masterInfo) override;
+
+  virtual void reregistered(
+      SchedulerDriver* driver,
+      const MasterInfo& masterInfo) override;
+
+  virtual void disconnected(SchedulerDriver* driver) override;
+
+  virtual void resourceOffers(
+      SchedulerDriver* driver,
+      const vector<Offer>& offers) override;
+
+  virtual void offerRescinded(
+      SchedulerDriver* driver,
+      const OfferID& offerId) override;
+
+  virtual void statusUpdate(
+      SchedulerDriver* driver,
+      const TaskStatus& status) override;
+
+  virtual void frameworkMessage(
+      SchedulerDriver* driver,
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      const string& data) override;
+
+  virtual void slaveLost(
+      SchedulerDriver* driver,
+      const SlaveID& slaveId) override;
+
+  virtual void executorLost(
+      SchedulerDriver* driver,
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      int status) override;
+
+  virtual void error(
+      SchedulerDriver* driver,
+      const string& message) override;
+
+  // v1 MesosBase interface overrides.
+  virtual void send(const v1::scheduler::Call& call) override;
+
+  virtual void reconnect() override
+  {
+    // The driver does not support explicit reconnection with the master.
+    UNREACHABLE();
+  }
+
+  process::Owned<V0ToV1AdapterProcess> process;
+
+private:
+  Owned<MesosSchedulerDriver> driver;
+};
+
+
+// The process (below) is responsible for ensuring synchronized access between
+// callbacks received from the driver and calls invoked by the adapter.
+class V0ToV1AdapterProcess : public process::Process<V0ToV1AdapterProcess>
+{
+public:
+  V0ToV1AdapterProcess(JNIEnv* env, jweak jmesos);
+
+  virtual ~V0ToV1AdapterProcess() = default;
+
+  void registered(const FrameworkID& frameworkId);
+
+  void reregistered();
+
+  void disconnected();
+
+  void resourceOffers(const vector<Offer>& offers);
+
+  void offerRescinded(const OfferID& offerId);
+
+  void statusUpdate(const TaskStatus& status);
+
+  void frameworkMessage(
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      const string& data);
+
+  void slaveLost(const SlaveID& slaveId);
+
+  void executorLost(
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      int status);
+
+  void error(const string& message);
+
+  void send(SchedulerDriver*, const v1::scheduler::Call& call);
+
+  JavaVM* jvm;
+  JNIEnv* env;
+  jweak jmesos;
+
+protected:
+  void received(const Event& event);
+
+  void _received();
+
+  void __received(const Event& event);
+
+  void heartbeat();
+
+  void disconnect();
+
+private:
+  bool subscribeCall;
+  const Duration interval;
+  queue<Event> pending;
+  Option<FrameworkID> frameworkId;
+  Option<Timer> heartbeatTimer;
+};
+
+
+V0ToV1Adapter::V0ToV1Adapter(
+    JNIEnv* env,
+    jweak jmesos,
+    const FrameworkInfo& frameworkInfo,
+    const string& master,
+    const Option<Credential>& credential)
+  : process(new V0ToV1AdapterProcess(env, jmesos))
+{
+  spawn(process.get());
+
+  driver.reset(
+      credential.isSome()
+        // Disable implicit acks.
+        ? new MesosSchedulerDriver(
+              this, frameworkInfo, master, false, credential.get())
+        : new MesosSchedulerDriver(this, frameworkInfo, master, false));
+
+  driver->start();
+}
+
+
+V0ToV1Adapter::~V0ToV1Adapter()
+{
+  terminate(process.get());
+  wait(process.get());
+}
+
+
+void V0ToV1Adapter::error(
+    SchedulerDriver*,
+    const string& message)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::error, message);
+}
+
+
+void V0ToV1Adapter::executorLost(
+    SchedulerDriver*,
+    const ExecutorID& executorId,
+    const SlaveID& slaveId,
+    int status)
+{
+  process::dispatch(
+      process.get(),
+      &V0ToV1AdapterProcess::executorLost,
+      executorId,
+      slaveId,
+      status);
+}
+
+
+void V0ToV1Adapter::slaveLost(
+    SchedulerDriver*,
+    const SlaveID& slaveId)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::slaveLost, slaveId);
+}
+
+
+void V0ToV1Adapter::frameworkMessage(
+    SchedulerDriver*,
+    const ExecutorID& executorId,
+    const SlaveID& slaveId,
+    const string& data)
+{
+  process::dispatch(
+      process.get(),
+      &V0ToV1AdapterProcess::frameworkMessage,
+      executorId,
+      slaveId,
+      data);
+}
+
+
+void V0ToV1Adapter::statusUpdate(
+    SchedulerDriver*,
+    const TaskStatus& status)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::statusUpdate, status);
+}
+
+
+void V0ToV1Adapter::offerRescinded(
+    SchedulerDriver*,
+    const OfferID& offerId)
+{
+  process::dispatch(
+      process.get(), &V0ToV1AdapterProcess::offerRescinded, offerId);
+}
+
+
+void V0ToV1Adapter::resourceOffers(
+    SchedulerDriver*,
+    const vector<Offer>& offers)
+{
+  process::dispatch(
+      process.get(), &V0ToV1AdapterProcess::resourceOffers, offers);
+}
+
+
+void V0ToV1Adapter::registered(
+    SchedulerDriver*,
+    const FrameworkID &frameworkId,
+    const MasterInfo&)
+{
+  process::dispatch(
+      process.get(),
+      &V0ToV1AdapterProcess::registered,
+      frameworkId);
+}
+
+
+void V0ToV1Adapter::reregistered(
+    SchedulerDriver*,
+    const MasterInfo&)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::reregistered);
+}
+
+
+void V0ToV1Adapter::disconnected(SchedulerDriver*)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::disconnected);
+}
+
+
+void V0ToV1Adapter::send(const Call& call)
+{
+  process::dispatch(
+      process.get(), &V0ToV1AdapterProcess::send, driver.get(), call);
+}
+
+
+V0ToV1AdapterProcess::V0ToV1AdapterProcess(
+    JNIEnv* _env,
+    jweak _jmesos)
+  : ProcessBase(process::ID::generate("SchedulerV0ToV1Adapter")),
+    jvm(nullptr),
+    env(_env),
+    jmesos(_jmesos),
+    subscribeCall(false),
+    interval(DEFAULT_HEARTBEAT_INTERVAL)
+{
+  env->GetJavaVM(&jvm);
+}
+
+
+void V0ToV1AdapterProcess::registered(const FrameworkID& _frameworkId)
+{
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), NULL);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.connected(mesos);
+  jmethodID connected =
+    env->GetMethodID(clazz, "connected",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jscheduler, connected, jmesos);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `connected` call");
+  }
+
+  jvm->DetachCurrentThread();
+
+  // We need this copy to populate the fields in `Event::Subscribed` upon
+  // receiving a `reregistered()` callback later.
+  frameworkId = _frameworkId;
+
+  // These events are queued and delivered to the scheduler upon receiving the
+  // subscribe call later. See comments in `send()` for more details.
+  {
+    Event event;
+    event.set_type(Event::SUBSCRIBED);
+
+    Event::Subscribed* subscribed = event.mutable_subscribed();
+
+    subscribed->mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
+
+    subscribed->set_heartbeat_interval_seconds(interval.secs());
+
+    received(event);
+  }
+
+  {
+    Event event;
+    event.set_type(Event::HEARTBEAT);
+
+    received(event);
+  }
+}
+
+
+void V0ToV1AdapterProcess::reregistered()
+{
+  CHECK_SOME(frameworkId);
+  registered(frameworkId.get());
+}
+
+
+void V0ToV1AdapterProcess::disconnected()
+{
+  disconnect();
+
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), NULL);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.disconnected(mesos);
+  jmethodID disconnected =
+    env->GetMethodID(clazz, "disconnected",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jmesos, disconnected);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `disconnected` call");
+  }
+
+  jvm->DetachCurrentThread();
+}
+
+
+void V0ToV1AdapterProcess::resourceOffers(const vector<Offer>& _offers)
+{
+  Event event;
+  event.set_type(Event::OFFERS);
+
+  Event::Offers* offers = event.mutable_offers();
+
+  foreach (const Offer& offer, _offers) {
+    offers->add_offers()->CopyFrom(evolve(offer));
+  }
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::offerRescinded(const OfferID& offerId)
+{
+  Event event;
+  event.set_type(Event::RESCIND);
+
+  event.mutable_rescind()->mutable_offer_id()->
+    CopyFrom(evolve(offerId));
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::statusUpdate(const TaskStatus& status)
+{
+  Event event;
+  event.set_type(Event::UPDATE);
+
+  event.mutable_update()->mutable_status()->
+    CopyFrom(mesos::internal::evolve(status));
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::frameworkMessage(
+    const ExecutorID& executorId,
+    const SlaveID& slaveId,
+    const string& data)
+{
+  Event event;
+  event.set_type(Event::MESSAGE);
+
+  event.mutable_message()->mutable_agent_id()->
+    CopyFrom(mesos::internal::evolve(slaveId));
+
+  event.mutable_message()->mutable_executor_id()->
+    CopyFrom(mesos::internal::evolve(executorId));
+
+  event.mutable_message()->set_data(data.data());
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::slaveLost(const SlaveID& slaveId)
+{
+  Event event;
+  event.set_type(Event::FAILURE);
+
+  event.mutable_failure()->mutable_agent_id()->
+    CopyFrom(mesos::internal::evolve(slaveId));
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::executorLost(
+    const ExecutorID& executorId,
+    const SlaveID& slaveId,
+    int status)
+{
+  Event event;
+  event.set_type(Event::FAILURE);
+
+  event.mutable_failure()->mutable_agent_id()->
+    CopyFrom(mesos::internal::evolve(slaveId));
+
+  event.mutable_failure()->mutable_executor_id()->
+    CopyFrom(mesos::internal::evolve(executorId));
+
+  event.mutable_failure()->set_status(status);
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::error(const string& message)
+{
+  Event event;
+  event.set_type(Event::ERROR);
+
+  event.mutable_error()->set_message(message);
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::send(SchedulerDriver* driver, const Call& _call)
+{
+  CHECK_NOTNULL(driver);
+
+  scheduler::Call call = devolve(_call);
+
+  Option<Error> error = validation::scheduler::call::validate(call);
+  if (error.isSome()) {
+    LOG(WARNING) << "Dropping " << call.type() << ": due to error "
+                 << error->message;
+    return;
+  }
+
+  switch (call.type()) {
+    case Call::SUBSCRIBE: {
+      subscribeCall = true;
+
+      heartbeatTimer = process::delay(interval, self(), &Self::heartbeat);
+
+      // The driver subscribes implicitly with the master upon initialization.
+      // For compatibility with the v1 interface, send the already enqueued
+      // subscribed event upon receiving the subscribe request.
+      _received();
+      break;
+    }
+
+    case Call::TEARDOWN: {
+      driver->stop(false);
+      break;
+    }
+
+    case Call::ACCEPT: {
+      vector<OfferID> offerIds;
+      foreach (const OfferID& offerId, call.accept().offer_ids()) {
+        offerIds.emplace_back(offerId);
+      }
+
+      vector<Offer::Operation> operations;
+      foreach (const Offer::Operation& operation, call.accept().operations()) {
+        operations.emplace_back(operation);
+      }
+
+      if (call.accept().has_filters()) {
+        driver->acceptOffers(offerIds, operations, call.accept().filters());
+      } else {
+        driver->acceptOffers(offerIds, operations);
+      }
+
+      break;
+    }
+
+    case Call::ACCEPT_INVERSE_OFFERS:
+    case Call::DECLINE_INVERSE_OFFERS:
+    case Call::SHUTDOWN: {
+      // TODO(anand): Throw java error.
+      LOG(ERROR) << "Received an unexpected " << call.type() << " call";
+      break;
+    }
+
+    case Call::DECLINE: {
+      foreach (const OfferID& offerId, call.decline().offer_ids()) {
+        if (call.decline().has_filters()) {
+          driver->declineOffer(offerId, call.decline().filters());
+        } else {
+          driver->declineOffer(offerId);
+        }
+      }
+
+      break;
+    }
+
+    case Call::REVIVE: {
+      driver->reviveOffers();
+      break;
+    }
+
+    case Call::KILL: {
+      driver->killTask(call.kill().task_id());
+      break;
+    }
+
+    case Call::ACKNOWLEDGE: {
+      TaskStatus status;
+      status.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
+      status.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
+      status.set_uuid(call.acknowledge().uuid());
+
+      driver->acknowledgeStatusUpdate(status);
+      break;
+    }
+
+    case Call::RECONCILE: {
+      vector<TaskStatus> statuses;
+
+      foreach (const scheduler::Call::Reconcile::Task& task,
+               call.reconcile().tasks()) {
+        TaskStatus status;
+        status.mutable_task_id()->CopyFrom(task.task_id());
+        statuses.emplace_back(status);
+      }
+
+      driver->reconcileTasks(statuses);
+      break;
+    }
+
+    case Call::MESSAGE: {
+      driver->sendFrameworkMessage(
+          call.message().executor_id(),
+          call.message().slave_id(),
+          string(call.message().data()));
+      break;
+    }
+
+    case Call::REQUEST: {
+      vector<Request> requests;
+
+      foreach (const Request& request, call.request().requests()) {
+        requests.emplace_back(request);
+      }
+
+      driver->requestResources(requests);
+      break;
+    }
+
+    case Call::SUPPRESS: {
+      driver->suppressOffers();
+      break;
+    }
+
+    case Call::UNKNOWN: {
+      EXIT(EXIT_FAILURE) << "Received an unexpected " << call.type()
+                         << " call";
+      break;
+    }
+  }
+}
+
+
+void V0ToV1AdapterProcess::received(const Event& event)
+{
+  // For compatibility with the v1 interface, we only start sending events
+  // once the scheduler has sent the subscribe call.
+  if (!subscribeCall) {
+    pending.push(event);
+    return;
+  }
+
+  pending.push(event);
+
+  _received();
+}
+
+
+void V0ToV1AdapterProcess::_received()
+{
+  CHECK(subscribeCall);
+
+  while (!pending.empty()) {
+    __received(pending.front());
+    pending.pop();
+  }
+}
+
+
+void V0ToV1AdapterProcess::__received(const Event& event)
+{
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.received(mesos, event);
+  jmethodID received =
+    env->GetMethodID(clazz, "received",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;"
+                     "Lorg/apache/mesos/v1/scheduler/Protos$Event;)V");
+
+  jobject jevent = convert<Event>(env, event);
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jscheduler, received, jmesos, jevent);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `received` call");
+  }
+
+  jvm->DetachCurrentThread();
+}
+
+
+void V0ToV1AdapterProcess::heartbeat()
+{
+  // It is possible that we were unable to cancel this timer upon a
+  // disconnection. If this occurs, don't bother sending the heartbeat
+  // event.
+  if (heartbeatTimer.isNone() || !heartbeatTimer->timeout().expired()) {
+    return;
+  }
+
+  CHECK(subscribeCall)
+    << "Cannot send heartbeat events to the scheduler without receiving a "
+    << "subscribe call";
+
+  Event event;
+  event.set_type(Event::HEARTBEAT);
+
+  received(event);
+
+  heartbeatTimer = process::delay(interval, self(), &Self::heartbeat);
+}
+
+
+void V0ToV1AdapterProcess::disconnect()
+{
+  // Upon noticing a disconnection with the master, we drain the pending
+  // events in the queue that were waiting to be sent to the scheduler
+  // upon receiving the subscribe call.
+  // It's fine to do so because:
+  // - Any outstanding offers are invalidated by the master upon a scheduler
+  //   (re-)registration.
+  // - Any task status updates could be reconciled by the scheduler.
+  pending = queue<Event>();
+  subscribeCall = false;
+
+  if (heartbeatTimer.isSome()) {
+    Clock::cancel(heartbeatTimer.get());
+    heartbeatTimer = None();
+  }
+}
+
+
+extern "C" {
+
+/*
+ * Class:     org_apache_mesos_v1_scheduler_V0Mesos
+ * Method:    initialize
+ * Signature: ()V
+ *
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_initialize
+  (JNIEnv* env, jobject thiz)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  // Create a weak global reference to the Scheduler
+  // instance (we want a global reference so the GC doesn't collect
+  // the instance but we make it weak so the JVM can exit).
+  jweak jmesos = env->NewWeakGlobalRef(thiz);
+
+  // Get out the FrameworkInfo passed into the constructor.
+  jfieldID framework =
+    env->GetFieldID(clazz, "framework",
+                    "Lorg/apache/mesos/v1/Protos$FrameworkInfo;");
+
+  jobject jframework = env->GetObjectField(thiz, framework);
+
+  // Get out the master passed into the constructor.
+  jfieldID master = env->GetFieldID(clazz, "master", "Ljava/lang/String;");
+  jobject jmaster = env->GetObjectField(thiz, master);
+
+  // Get out the credential passed into the constructor.
+  jfieldID credential =
+    env->GetFieldID(clazz, "credential",
+                    "Lorg/apache/mesos/v1/Protos$Credential;");
+
+  jobject jcredential = env->GetObjectField(thiz, credential);
+
+  Option<Credential> credential_;
+  if (!env->IsSameObject(jcredential, nullptr)) {
+    credential_ = construct<Credential>(env, jcredential);
+  }
+
+  // Create the C++ scheduler and initialize the `__mesos` variable.
+  V0ToV1Adapter* mesos =
+    new V0ToV1Adapter(
+        env,
+        jmesos,
+        devolve(construct<v1::FrameworkInfo>(env, jframework)),
+        construct<string>(env, jmaster),
+        credential_);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+  env->SetLongField(thiz, __mesos, (jlong) mesos);
+}
+
+
+/*
+ * Class:     org_apache_mesos_v1_scheduler_V0Mesos
+ * Method:    finalize
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_finalize
+  (JNIEnv* env, jobject thiz)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+
+  V0ToV1Adapter* mesos =
+    (V0ToV1Adapter*) env->GetLongField(thiz, __mesos);
+
+  env->DeleteWeakGlobalRef(mesos->process->jmesos);
+
+  delete mesos;
+}
+
+
+/*
+ * Class:     org_apache_mesos_v1_scheduler_V0Mesos
+ * Method:    send
+ * Signature: (Lorg/apache/mesos/v1/scheduler/Protos/Call;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_send
+  (JNIEnv* env, jobject thiz, jobject jcall)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+
+  V0ToV1Adapter* mesos =
+    (V0ToV1Adapter*) env->GetLongField(thiz, __mesos);
+
+  mesos->send(construct<Call>(env, jcall));
+}
+
+} // extern "C" {


[5/7] mesos git commit: Added v1 Scheduler/Mesos interface in Java.

Posted by an...@apache.org.
Added v1 Scheduler/Mesos interface in Java.

This change adds an interface for the v1 Scheduler + Mesos
interface that the schedulers can use to connect to Mesos.

Review: https://reviews.apache.org/r/50250


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c1ba4a5a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c1ba4a5a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c1ba4a5a

Branch: refs/heads/master
Commit: c1ba4a5a2dc41e6b19b08ef8f62af8b62bc2a481
Parents: 07a1802
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 19:36:22 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |  5 +-
 .../org/apache/mesos/v1/scheduler/Mesos.java    | 58 ++++++++++++++++++++
 .../apache/mesos/v1/scheduler/Scheduler.java    | 53 ++++++++++++++++++
 3 files changed, 115 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c1ba4a5a/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 5eabc33..fa4dea2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1515,7 +1515,10 @@ MESOS_JAR_SOURCE =							\
   $(srcdir)/java/src/org/apache/mesos/state/LogState.java		\
   $(srcdir)/java/src/org/apache/mesos/state/State.java			\
   $(srcdir)/java/src/org/apache/mesos/state/Variable.java		\
-  $(srcdir)/java/src/org/apache/mesos/state/ZooKeeperState.java
+  $(srcdir)/java/src/org/apache/mesos/state/ZooKeeperState.java		\
+  $(srcdir)/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java		\
+  $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Mesos.java		\
+  $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
 MESOS_JAR_GENERATED = $(JAVA_PROTOS) $(V1_JAVA_PROTOS)			\
   java/generated/org/apache/mesos/MesosNativeLibrary.java
 EXTRA_DIST += $(MESOS_JAR_SOURCE)					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/c1ba4a5a/src/java/src/org/apache/mesos/v1/scheduler/Mesos.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/v1/scheduler/Mesos.java b/src/java/src/org/apache/mesos/v1/scheduler/Mesos.java
new file mode 100644
index 0000000..db0116a
--- /dev/null
+++ b/src/java/src/org/apache/mesos/v1/scheduler/Mesos.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.v1.scheduler;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+
+/**
+ * Abstract interface for connecting a scheduler to Mesos. This interface
+ * is used to send Call's to Mesos (e.g., launch tasks, kill tasks, etc.).
+ */
+public interface Mesos {
+  /**
+   * Sends a Call to the Mesos master. The scheduler should only invoke this
+   * method once it has received the 'connected' callback. Otherwise, all calls
+   * would be dropped while disconnected.
+   *
+   * Some local validation of calls is performed which may generate
+   * events without ever being sent to the master.
+   *
+   * These comments are copied from include/mesos/v1/scheduler.hpp and should
+   * be kept in sync.
+   */
+  void send(Call call);
+
+  /**
+   * Force a reconnection with the Mesos master.
+   *
+   * In the case of a one-way network partition, the connection between the
+   * scheduler and master might not necessarily break. If the scheduler detects
+   * a partition, due to lack of `HEARTBEAT` events (e.g., 5) within a time
+   * window, it can explicitly ask the library to force a reconnection with
+   * the master.
+   *
+   * This call would be ignored if the scheduler is already disconnected with
+   * the master (e.g., no new master has been elected). Otherwise, the scheduler
+   * would get a 'disconnected' callback followed by a 'connected' callback.
+   *
+   * These comments are copied from include/mesos/v1/scheduler.hpp and should
+   * be kept in sync.
+   */
+  void reconnect();
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/c1ba4a5a/src/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/v1/scheduler/Scheduler.java b/src/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
new file mode 100644
index 0000000..28efaff
--- /dev/null
+++ b/src/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.v1.scheduler;
+
+import org.apache.mesos.v1.scheduler.Protos.Event;
+
+/**
+ * Callback interface to be implemented by schedulers.
+ * Note that only one callback will be invoked at a time,
+ * so it is not recommended that you block within a callback because
+ * it may cause a deadlock.
+ * <p>
+ * Each callback includes a reference to the Mesos interface that was
+ * used to run this scheduler. The reference will not change for the
+ * duration of a scheduler from the time it is instantiated.
+ * This is intended for convenience so that a scheduler doesn't need to
+ * store a reference to the interface itself.
+ */
+
+public interface Scheduler {
+  /**
+   * Invoked when a connection is established with the master upon a
+   * master (re-)detection.
+   */
+  void connected(Mesos mesos);
+
+  /**
+   * Invoked when no master is detected or when the existing persistent
+   * connection is interrupted.
+   */
+  void disconnected(Mesos mesos);
+
+  /**
+   * Invoked when a new event is received from the Mesos master.
+   */
+  void received(Mesos mesos, Event event);
+}


[2/7] mesos git commit: Added native implementation for v1 Mesos interface.

Posted by an...@apache.org.
Added native implementation for v1 Mesos interface.

This change adds the native C++ implementation for the v1
Java class `JNIMesos` used for interacting with Mesos.

Review: https://reviews.apache.org/r/50252


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bca68f63
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bca68f63
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bca68f63

Branch: refs/heads/master
Commit: bca68f63aa04d91cdb44b8e100f364bf981d6bd2
Parents: 5855532
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 23:17:50 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   9 +-
 .../org_apache_mesos_v1_scheduler_JNIMesos.cpp  | 317 +++++++++++++++++++
 2 files changed, 325 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bca68f63/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3a743e4..2b02b5f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1562,6 +1562,7 @@ libjava_la_SOURCES =							\
   java/jni/org_apache_mesos_state_LogState.cpp				\
   java/jni/org_apache_mesos_state_Variable.cpp				\
   java/jni/org_apache_mesos_state_ZooKeeperState.cpp			\
+  java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp			\
   jvm/jvm.cpp								\
   jvm/jvm.hpp								\
   jvm/java/io.hpp							\
@@ -1603,7 +1604,8 @@ nodist_libjava_la_SOURCES =						\
   java/jni/org_apache_mesos_state_LevelDBState.h			\
   java/jni/org_apache_mesos_state_LogState.h				\
   java/jni/org_apache_mesos_state_Variable.h				\
-  java/jni/org_apache_mesos_state_ZooKeeperState.h
+  java/jni/org_apache_mesos_state_ZooKeeperState.h			\
+  java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h
 
 BUILT_SOURCES += $(nodist_libjava_la_SOURCES)
 
@@ -1647,6 +1649,11 @@ java/jni/org_apache_mesos_state_ZooKeeperState.h: $(MESOS_JAR)
 	-classpath $(MESOS_JAR):@PROTOBUF_JAR@				\
 	  org.apache.mesos.state.ZooKeeperState
 
+java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h: $(MESOS_JAR)
+	$(JAVA_HOME)/bin/javah -d java/jni				\
+	-classpath $(MESOS_JAR):@PROTOBUF_JAR@				\
+	  org.apache.mesos.v1.scheduler.JNIMesos
+
 $(EXAMPLES_JAR): $(EXAMPLES_SOURCE)
 	@echo "Building examples.jar ..."
 	$(MKDIR_P) examples/java

http://git-wip-us.apache.org/repos/asf/mesos/blob/bca68f63/src/java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp
new file mode 100644
index 0000000..e197b37
--- /dev/null
+++ b/src/java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+#include <vector>
+
+#include <process/owned.hpp>
+
+#include <mesos/v1/mesos.hpp>
+
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/result.hpp>
+#include <stout/try.hpp>
+
+#include "jvm/jvm.hpp"
+
+#include "construct.hpp"
+#include "convert.hpp"
+#include "org_apache_mesos_v1_scheduler_JNIMesos.h"
+
+using namespace mesos::v1::scheduler;
+
+using mesos::v1::Credential;
+
+using process::Owned;
+
+using std::string;
+using std::vector;
+
+namespace v1 {
+
+class JNIMesos
+{
+public:
+  JNIMesos(
+      JNIEnv* _env,
+      jweak _jmesos,
+      const string& master,
+      const Option<Credential>& credential)
+    : jvm(nullptr), env(_env), jmesos(_jmesos)
+  {
+    env->GetJavaVM(&jvm);
+
+    mesos.reset(
+        new Mesos(master,
+            mesos::ContentType::PROTOBUF,
+            std::bind(&JNIMesos::connected, this),
+            std::bind(&JNIMesos::disconnected, this),
+            std::bind(&JNIMesos::received_, this, lambda::_1),
+            credential));
+  }
+
+  virtual ~JNIMesos() = default;
+
+  virtual void connected();
+  virtual void disconnected();
+  virtual void received(const Event& event);
+
+  void received_(std::queue<Event> events) {
+    while (!events.empty()) {
+      received(events.front());
+      events.pop();
+    }
+  }
+
+  JavaVM* jvm;
+  JNIEnv* env;
+  jweak jmesos;
+
+  Owned<Mesos> mesos;
+};
+
+
+void JNIMesos::connected()
+{
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.connected(mesos);
+  jmethodID connected =
+    env->GetMethodID(clazz, "connected",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jscheduler, connected, jmesos);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `connected` call");
+  }
+
+  jvm->DetachCurrentThread();
+}
+
+
+void JNIMesos::disconnected()
+{
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.disconnected(mesos);
+  jmethodID disconnected =
+    env->GetMethodID(clazz, "disconnected",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jscheduler, disconnected, jmesos);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `disconnected` call");
+  }
+
+  jvm->DetachCurrentThread();
+}
+
+
+void JNIMesos::received(const Event& event)
+{
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.received(mesos, event);
+  jmethodID received =
+    env->GetMethodID(clazz, "received",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;"
+                     "Lorg/apache/mesos/v1/scheduler/Protos$Event;)V");
+
+  jobject jevent = convert<Event>(env, event);
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jscheduler, received, jmesos, jevent);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `received` call");
+  }
+
+  jvm->DetachCurrentThread();
+}
+
+} // namespace v1 {
+
+
+extern "C" {
+
+/*
+ * Class:     org_apache_mesos_v1_JNIMesos
+ * Method:    initialize
+ * Signature: ()V
+ *
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_JNIMesos_initialize
+  (JNIEnv* env, jobject thiz)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  // Create a weak global reference to the Scheduler
+  // instance (we want a global reference so the GC doesn't collect
+  // the instance but we make it weak so the JVM can exit).
+  jweak jmesos = env->NewWeakGlobalRef(thiz);
+
+  // Get out the master passed into the constructor.
+  jfieldID master = env->GetFieldID(clazz, "master", "Ljava/lang/String;");
+  jobject jmaster = env->GetObjectField(thiz, master);
+
+  // Get out the credential passed into the constructor.
+  jfieldID credential =
+    env->GetFieldID(clazz, "credential",
+                    "Lorg/apache/mesos/v1/Protos$Credential;");
+
+  jobject jcredential = env->GetObjectField(thiz, credential);
+
+  Option<Credential> credential_;
+  if (!env->IsSameObject(jcredential, nullptr)) {
+    credential_ = construct<Credential>(env, jcredential);
+  }
+
+  // Create the C++ scheduler and initialize `__mesos`.
+  v1::JNIMesos* mesos =
+    new v1::JNIMesos(env, jmesos, construct<string>(env, jmaster), credential_);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+  env->SetLongField(thiz, __mesos, (jlong) mesos);
+}
+
+
+/*
+ * Class:     org_apache_mesos_v1_JNIMesos
+ * Method:    finalize
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_JNIMesos_finalize
+  (JNIEnv* env, jobject thiz)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+  v1::JNIMesos* mesos =
+    (v1::JNIMesos*) env->GetLongField(thiz, __mesos);
+
+  env->DeleteWeakGlobalRef(mesos->jmesos);
+
+  delete mesos;
+}
+
+
+/*
+ * Class:     org_apache_mesos_v1_JNIMesos
+ * Method:    send
+ * Signature: (Lorg/apache/mesos/v1/scheduler/Protos/Call;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_JNIMesos_send
+  (JNIEnv* env, jobject thiz, jobject jcall)
+{
+  // Construct a C++ Call from the Java Call.
+  const Call& call = construct<Call>(env, jcall);
+
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+  v1::JNIMesos* mesos =
+    (v1::JNIMesos*) env->GetLongField(thiz, __mesos);
+
+  // It is possible that `mesos` might not be initialized in some cases due to
+  // a possible race condition. See MESOS-5926 for more details.
+  if (mesos->mesos.get() == nullptr) {
+    LOG(WARNING) << "Ignoring call " << call.type() << " as the library has "
+                 << "not been initialized yet";
+    return;
+  }
+
+  mesos->mesos->send(call);
+}
+
+
+/*
+ * Class:     org_apache_mesos_v1_scheduler_JNIMesos
+ * Method:    reconnect
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_JNIMesos_reconnect
+  (JNIEnv* env, jobject thiz)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+  v1::JNIMesos* mesos =
+    (v1::JNIMesos*) env->GetLongField(thiz, __mesos);
+
+  // It is possible that `mesos` might not be initialized in some cases due to
+  // a possible race condition. See MESOS-5926 for more details.
+  if (mesos->mesos.get() == nullptr) {
+    LOG(WARNING) << "Ignoring the reconnect request as the library has not "
+                 << "been initialized yet";
+    return;
+  }
+
+  mesos->mesos->reconnect();
+}
+
+} // extern "C" {


[4/7] mesos git commit: Added a abstract base class for scheduler library.

Posted by an...@apache.org.
Added a abstract base class for scheduler library.

This change adds an abstract base class `MesosBase` that
would be used by implementations for interacting with
Mesos via the Scheduler API. This is needed later for
implementing the V0->V1 Scheduler Adapter.

Review: https://reviews.apache.org/r/50247


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e184216b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e184216b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e184216b

Branch: refs/heads/master
Commit: e184216b299424fc6b7b241fd9caeb7e2f209306
Parents: 9567fb4
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 17:28:27 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 include/mesos/v1/scheduler.hpp | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e184216b/include/mesos/v1/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/scheduler.hpp b/include/mesos/v1/scheduler.hpp
index 18e7a95..d56e088 100644
--- a/include/mesos/v1/scheduler.hpp
+++ b/include/mesos/v1/scheduler.hpp
@@ -41,8 +41,19 @@ namespace scheduler {
 
 class MesosProcess; // Forward declaration.
 
-// Interface to Mesos for a scheduler. Abstracts master detection
-// (connection and disconnection).
+// Abstract interface for connecting a scheduler to Mesos.
+class MesosBase
+{
+public:
+  // Empty virtual destructor (necessary to instantiate subclasses).
+  virtual ~MesosBase() {}
+  virtual void send(const Call& call) = 0;
+  virtual void reconnect() = 0;
+};
+
+
+// Concrete implementation that connects a scheduler to a Mesos master.
+// Abstracts master detection (connection and disconnection).
 //
 // Expects three callbacks, 'connected', 'disconnected', and
 // 'received' which will get invoked _serially_ when it's determined
@@ -51,7 +62,7 @@ class MesosProcess; // Forward declaration.
 // The library reconnects with the master upon a disconnection.
 //
 // NOTE: All calls and events are dropped while disconnected.
-class Mesos
+class Mesos : public MesosBase
 {
 public:
   // The credential will be used for authenticating with the master. Currently,
@@ -81,7 +92,7 @@ public:
   // events without ever being sent to the master. This includes when
   // calls are sent but no master is currently detected (i.e., we're
   // disconnected).
-  virtual void send(const Call& call);
+  virtual void send(const Call& call) override;
 
   // Force a reconnection with the master.
   //
@@ -94,7 +105,7 @@ public:
   // This call would be ignored if the scheduler is already disconnected with
   // the master (e.g., no new master has been elected). Otherwise, the scheduler
   // would get a 'disconnected' callback followed by a 'connected' callback.
-  virtual void reconnect();
+  virtual void reconnect() override;
 
 protected:
   // NOTE: This constructor is used for testing.


[3/7] mesos git commit: Added helper functions for v1 JNI `construct()`/`convert()`.

Posted by an...@apache.org.
Added helper functions for v1 JNI `construct()`/`convert()`.

These would be used later in the chain for the making JNI
calls to v1 Mesos implementation in native code.

Review: https://reviews.apache.org/r/50248


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/07a1802f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/07a1802f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/07a1802f

Branch: refs/heads/master
Commit: 07a1802f1a6c1b7b3b778bb86d17254842322e5c
Parents: e184216
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 19:07:41 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 src/java/jni/construct.cpp | 67 +++++++++++++++++++++++++++++++++++++++++
 src/java/jni/convert.cpp   | 26 ++++++++++++++++
 2 files changed, 93 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/07a1802f/src/java/jni/construct.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/construct.cpp b/src/java/jni/construct.cpp
index e5c070c..2645d9e 100644
--- a/src/java/jni/construct.cpp
+++ b/src/java/jni/construct.cpp
@@ -25,6 +25,10 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/v1/mesos.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
 #include "construct.hpp"
 
 using namespace mesos;
@@ -399,3 +403,66 @@ Offer::Operation construct(JNIEnv* env, jobject jobj)
 
   return operation;
 }
+
+
+template <>
+v1::Credential construct(JNIEnv* env, jobject jobj)
+{
+  jclass clazz = env->GetObjectClass(jobj);
+
+  // byte[] data = obj.toByteArray();
+  jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B");
+
+  jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray);
+
+  jbyte* data = env->GetByteArrayElements(jdata, nullptr);
+  jsize length = env->GetArrayLength(jdata);
+
+  const v1::Credential& credential = parse<v1::Credential>(data, length);
+
+  env->ReleaseByteArrayElements(jdata, data, 0);
+
+  return credential;
+}
+
+
+template <>
+v1::FrameworkInfo construct(JNIEnv* env, jobject jobj)
+{
+  jclass clazz = env->GetObjectClass(jobj);
+
+  // byte[] data = obj.toByteArray();
+  jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B");
+
+  jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray);
+
+  jbyte* data = env->GetByteArrayElements(jdata, nullptr);
+  jsize length = env->GetArrayLength(jdata);
+
+  const v1::FrameworkInfo& framework = parse<v1::FrameworkInfo>(data, length);
+
+  env->ReleaseByteArrayElements(jdata, data, 0);
+
+  return framework;
+}
+
+
+template<>
+v1::scheduler::Call construct(JNIEnv* env, jobject jobj)
+{
+  jclass clazz = env->GetObjectClass(jobj);
+
+  // byte[] data = obj.toByteArray();
+  jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B");
+
+  jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray);
+
+  jbyte* data = env->GetByteArrayElements(jdata, nullptr);
+  jsize length = env->GetArrayLength(jdata);
+
+  const v1::scheduler::Call& call = parse<v1::scheduler::Call>(data, length);
+
+  env->ReleaseByteArrayElements(jdata, data, 0);
+
+  return call;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/07a1802f/src/java/jni/convert.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/convert.cpp b/src/java/jni/convert.cpp
index 45ff488..338eb96 100644
--- a/src/java/jni/convert.cpp
+++ b/src/java/jni/convert.cpp
@@ -21,6 +21,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/v1/scheduler/scheduler.hpp>
+
 #include <stout/result.hpp>
 #include <stout/strings.hpp>
 
@@ -488,6 +490,30 @@ jobject convert(JNIEnv* env, const Status& status)
 }
 
 
+template <>
+jobject convert(JNIEnv* env, const v1::scheduler::Event& event)
+{
+  string data;
+  event.SerializeToString(&data);
+
+  // byte[] data = ..;
+  jbyteArray jdata = env->NewByteArray(data.size());
+  env->SetByteArrayRegion(jdata, 0, data.size(), (jbyte*) data.data());
+
+  // Event event = Event.parseFrom(data);
+  jclass clazz =
+    FindMesosClass(env, "org/apache/mesos/v1/scheduler/Protos$Event");
+
+  jmethodID parseFrom =
+    env->GetStaticMethodID(clazz, "parseFrom",
+                           "([B)Lorg/apache/mesos/v1/scheduler/Protos$Event;");
+
+  jobject jevent = env->CallStaticObjectMethod(clazz, parseFrom, jdata);
+
+  return jevent;
+}
+
+
 // Helper to safely return the 'jfieldID' of the given 'name'
 // and 'signature'. If the field doesn't exist 'None' is
 // returned. If any other JVM Exception is encountered an 'Error'


[7/7] mesos git commit: Added java implementations for the V0/V1 implementation for Mesos.

Posted by an...@apache.org.
Added java implementations for the V0/V1 implementation for Mesos.

Review: https://reviews.apache.org/r/50251


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58555326
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58555326
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58555326

Branch: refs/heads/master
Commit: 5855532668ab2a5f23914b26ccee70d7a5068302
Parents: c1ba4a5
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 19:56:50 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |  3 +-
 .../org/apache/mesos/v1/scheduler/JNIMesos.java | 84 ++++++++++++++++++++
 .../org/apache/mesos/v1/scheduler/V0Mesos.java  | 84 ++++++++++++++++++++
 3 files changed, 170 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/58555326/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index fa4dea2..3a743e4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1518,7 +1518,8 @@ MESOS_JAR_SOURCE =							\
   $(srcdir)/java/src/org/apache/mesos/state/ZooKeeperState.java		\
   $(srcdir)/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java		\
   $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Mesos.java		\
-  $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Scheduler.java
+  $(srcdir)/java/src/org/apache/mesos/v1/scheduler/Scheduler.java	\
+  $(srcdir)/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java
 MESOS_JAR_GENERATED = $(JAVA_PROTOS) $(V1_JAVA_PROTOS)			\
   java/generated/org/apache/mesos/MesosNativeLibrary.java
 EXTRA_DIST += $(MESOS_JAR_SOURCE)					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/58555326/src/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java b/src/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java
new file mode 100644
index 0000000..fc804a4
--- /dev/null
+++ b/src/java/src/org/apache/mesos/v1/scheduler/JNIMesos.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.v1.scheduler;
+
+import org.apache.mesos.MesosNativeLibrary;
+
+import org.apache.mesos.v1.Protos.Credential;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+
+/**
+ * Concrete implementation of the Mesos interface that connects a Scheduler
+ * with a Mesos master. This class is thread-safe.
+ * <p>
+ * This implementation uses the scheduler library (src/scheduler/scheduler.cpp)
+ * based on the V1 Mesos Scheduler API. The library is responsible for
+ * invoking the Scheduler callbacks as it communicates with the Mesos master.
+ * <p>
+ * <p>
+ * Note that the scheduler library uses GLOG to do its own logging. GLOG flags
+ * can be set via environment variables, prefixing the flag name with
+ * "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
+ * src/logging/flags.hpp. Mesos flags can also be set via environment
+ * variables, prefixing the flag name with "MESOS_", e.g., "MESOS_QUIET=1".
+ * <p>
+ * See src/examples/java/V1TestFramework.java for an example of using this.
+ */
+public class JNIMesos implements Mesos {
+  static {
+    MesosNativeLibrary.load();
+  }
+
+  public JNIMesos(Scheduler scheduler, String master) {
+    this(scheduler, master, null);
+  }
+
+  public JNIMesos(Scheduler scheduler, String master, Credential credential) {
+    if (scheduler == null) {
+      throw new NullPointerException("Not expecting a null scheduler");
+    }
+
+    if (master == null) {
+      throw new NullPointerException("Not expecting a null master");
+    }
+
+    this.scheduler = scheduler;
+    this.master = master;
+    this.credential = credential;
+
+    initialize();
+  }
+
+  @Override
+  public native void send(Call call);
+
+  @Override
+  public native void reconnect();
+
+  protected native void initialize();
+  protected native void finalize();
+
+  private final Scheduler scheduler;
+  private final String master;
+  private final Credential credential;
+
+  private long __mesos;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/58555326/src/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java b/src/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java
new file mode 100644
index 0000000..fc262e1
--- /dev/null
+++ b/src/java/src/org/apache/mesos/v1/scheduler/V0Mesos.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.v1.scheduler;
+
+import org.apache.mesos.MesosNativeLibrary;
+
+import org.apache.mesos.v1.Protos.Credential;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+
+/**
+ * This implementation acts as an adapter from the v0 (driver + scheduler)
+ * to the v1 Scheduler interface. It uses the MesosSchedulerDriver under
+ * the hood for interacting with Mesos. This class is thread-safe.
+ */
+public class V0Mesos implements Mesos {
+  static {
+    MesosNativeLibrary.load();
+  }
+
+  public V0Mesos(Scheduler scheduler, FrameworkInfo framework, String master) {
+    this(scheduler, framework, master, null);
+  }
+
+  public V0Mesos(Scheduler scheduler,
+                 FrameworkInfo framework,
+                 String master,
+                 Credential credential) {
+    if (scheduler == null) {
+      throw new NullPointerException("Not expecting a null scheduler");
+    }
+
+    if (framework == null) {
+      throw new NullPointerException("Not expecting a null framework");
+    }
+
+    if (master == null) {
+      throw new NullPointerException("Not expecting a null master");
+    }
+
+    this.scheduler = scheduler;
+    this.framework = framework;
+    this.master = master;
+    this.credential = credential;
+
+    initialize();
+  }
+
+  @Override
+  public native void send(Call call);
+
+  // This is currently a no-op for the driver as it does not expose semantics
+  // to force reconnection.
+  @Override
+  public void reconnect() {}
+
+  protected native void initialize();
+  protected native void finalize();
+
+  private final Scheduler scheduler;
+  private final FrameworkInfo framework;
+  private final String master;
+  private final Credential credential;
+
+  private long __mesos;
+}