You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by se...@apache.org on 2015/01/31 00:51:38 UTC
[6/8] incubator-reef git commit: [REEF-116] Moving bridge code to
proper folder structure
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/RunningTaskClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/RunningTaskClr2Java.cpp b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/RunningTaskClr2Java.cpp
new file mode 100644
index 0000000..8ca1e65
--- /dev/null
+++ b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/RunningTaskClr2Java.cpp
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "Clr2JavaImpl.h"
+
+namespace Org {
+ namespace Apache {
+ namespace Reef {
+ namespace Driver {
+ namespace Bridge {
+ ref class ManagedLog {
+ internal:
+ static BridgeLogger^ LOGGER = BridgeLogger::GetLogger("<C++>");
+ };
+ RunningTaskClr2Java::RunningTaskClr2Java(JNIEnv *env, jobject jobjectRunningTask) {
+ ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::RunningTaskClr2Java");
+
+ pin_ptr<JavaVM*> pJavaVm = &_jvm;
+ if (env->GetJavaVM(pJavaVm) != 0) {
+ ManagedLog::LOGGER->LogError("Failed to get JavaVM", nullptr);
+ }
+ _jobjectRunningTask = reinterpret_cast<jobject>(env->NewGlobalRef(jobjectRunningTask));
+
+ jclass jclassRunningTask = env->GetObjectClass (_jobjectRunningTask);
+ jmethodID jmidGetId = env->GetMethodID(jclassRunningTask, "getId", "()Ljava/lang/String;");
+
+ _jstringId = reinterpret_cast<jstring>(env->NewGlobalRef(env -> CallObjectMethod(_jobjectRunningTask, jmidGetId)));
+ ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::RunningTaskClr2Java");
+ }
+
+ IActiveContextClr2Java^ RunningTaskClr2Java::GetActiveContext() {
+ ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::GetActiveContext");
+
+ JNIEnv *env = RetrieveEnv(_jvm);
+
+ jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask);
+ jfieldID jidActiveContext = env->GetFieldID(jclassRunningTask, "jactiveContext", "Lorg/apache/reef/javabridge/ActiveContextBridge;");
+ jobject jobjectActiveContext = env->GetObjectField(_jobjectRunningTask, jidActiveContext);
+ ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::GetActiveContext");
+
+ return gcnew ActiveContextClr2Java(env, jobjectActiveContext);
+ }
+
+ String^ RunningTaskClr2Java::GetId() {
+ ManagedLog::LOGGER->Log("RunningTaskClr2Java::GetId");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ return ManagedStringFromJavaString(env, _jstringId);
+ }
+
+ void RunningTaskClr2Java::Send(array<byte>^ message) {
+ ManagedLog::LOGGER->LogStart("RunningTaskClr2Java::Send");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ jclass jclassRunningTask = env->GetObjectClass(_jobjectRunningTask);
+ jmethodID jmidSend = env->GetMethodID(jclassRunningTask, "send", "([B)V");
+
+
+ if (jmidSend == NULL) {
+ ManagedLog::LOGGER->Log("jmidSend is NULL");
+ return;
+ }
+ env->CallObjectMethod(
+ _jobjectRunningTask,
+ jmidSend,
+ JavaByteArrayFromManagedByteArray(env, message));
+ ManagedLog::LOGGER->LogStop("RunningTaskClr2Java::Send");
+ }
+
+ void RunningTaskClr2Java::OnError(String^ message) {
+ ManagedLog::LOGGER->Log("RunningTaskClr2Java::OnError");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ HandleClr2JavaError(env, message, _jobjectRunningTask);
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/SuspendedTaskClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/SuspendedTaskClr2Java.cpp b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/SuspendedTaskClr2Java.cpp
new file mode 100644
index 0000000..a10f88e
--- /dev/null
+++ b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/SuspendedTaskClr2Java.cpp
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "Clr2JavaImpl.h"
+
+namespace Org {
+ namespace Apache {
+ namespace Reef {
+ namespace Driver {
+ namespace Bridge {
+ ref class ManagedLog {
+ internal:
+ static BridgeLogger^ LOGGER = BridgeLogger::GetLogger("<C++>");
+ };
+
+ SuspendedTaskClr2Java::SuspendedTaskClr2Java(JNIEnv *env, jobject jobjectSuspendedTask) {
+ ManagedLog::LOGGER->LogStart("SuspendedTaskClr2Java::SuspendedTaskClr2Java");
+ pin_ptr<JavaVM*> pJavaVm = &_jvm;
+ if (env->GetJavaVM(pJavaVm) != 0) {
+ ManagedLog::LOGGER->LogError("Failed to get JavaVM", nullptr);
+ }
+ _jobjectSuspendedTask = reinterpret_cast<jobject>(env->NewGlobalRef(jobjectSuspendedTask));
+
+ jclass jclassSuspendedTask = env->GetObjectClass (_jobjectSuspendedTask);
+ jfieldID jidTaskId = env->GetFieldID(jclassSuspendedTask, "taskId", "Ljava/lang/String;");
+ _jstringId = reinterpret_cast<jstring>(env->NewGlobalRef(env->GetObjectField(_jobjectSuspendedTask, jidTaskId)));
+ ManagedLog::LOGGER->LogStop("SuspendedTaskClr2Java::SuspendedTaskClr2Java");
+ }
+
+ IActiveContextClr2Java^ SuspendedTaskClr2Java::GetActiveContext() {
+ ManagedLog::LOGGER->LogStart("SuspendedTaskClr2Java::GetActiveContext");
+ JNIEnv *env = RetrieveEnv(_jvm);
+
+ jclass jclassSuspendedTask = env->GetObjectClass (_jobjectSuspendedTask);
+ jfieldID jidActiveContext = env->GetFieldID(jclassSuspendedTask, "jactiveContext", "Lorg/apache/reef/javabridge/ActiveContextBridge;");
+ jobject jobjectActiveContext = env->GetObjectField(_jobjectSuspendedTask, jidActiveContext);
+ ManagedLog::LOGGER->LogStop("SuspendedTaskClr2Java::GetActiveContext");
+ return gcnew ActiveContextClr2Java(env, jobjectActiveContext);
+ }
+
+ String^ SuspendedTaskClr2Java::GetId() {
+ ManagedLog::LOGGER->Log("SuspendedTaskClr2Java::GetId");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ return ManagedStringFromJavaString(env, _jstringId);
+ }
+
+ array<byte>^ SuspendedTaskClr2Java::Get() {
+ ManagedLog::LOGGER->Log("SuspendedTaskClr2Java::Get");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ jclass jclassSuspendedTask = env->GetObjectClass (_jobjectSuspendedTask);
+ jmethodID jmidGet = env->GetMethodID(jclassSuspendedTask, "get", "()[B");
+
+ if (jmidGet == NULL) {
+ ManagedLog::LOGGER->Log("jmidGet is NULL");
+ return nullptr;
+ }
+ jbyteArray jMessage = (jbyteArray) env->CallObjectMethod(_jobjectSuspendedTask, jmidGet);
+ return ManagedByteArrayFromJavaByteArray(env, jMessage);
+ }
+
+ void SuspendedTaskClr2Java::OnError(String^ message) {
+ ManagedLog::LOGGER->Log("SuspendedTaskClr2Java::OnError");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ HandleClr2JavaError(env, message, _jobjectSuspendedTask);
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/TaskMessageClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/TaskMessageClr2Java.cpp b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/TaskMessageClr2Java.cpp
new file mode 100644
index 0000000..d2f8286
--- /dev/null
+++ b/lang/cpp/reef-bridge-clr/src/main/Cpp/CppBridge/JavaClrBridge/TaskMessageClr2Java.cpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "Clr2JavaImpl.h"
+
+namespace Org {
+ namespace Apache {
+ namespace Reef {
+ namespace Driver {
+ namespace Bridge {
+ ref class ManagedLog {
+ internal:
+ static BridgeLogger^ LOGGER = BridgeLogger::GetLogger("<C++>");
+ };
+
+ TaskMessageClr2Java::TaskMessageClr2Java(JNIEnv *env, jobject jtaskMessage) {
+ ManagedLog::LOGGER->LogStart("TaskMessageClr2Java::TaskMessageClr2Java");
+ pin_ptr<JavaVM*> pJavaVm = &_jvm;
+ if (env->GetJavaVM(pJavaVm) != 0) {
+ ManagedLog::LOGGER->LogError("Failed to get JavaVM", nullptr);
+ }
+ _jobjectTaskMessage = reinterpret_cast<jobject>(env->NewGlobalRef(jtaskMessage));
+
+ jclass jclassTaskMessage = env->GetObjectClass (_jobjectTaskMessage);
+ jfieldID jidTaskId = env->GetFieldID(jclassTaskMessage, "taskId", "Ljava/lang/String;");
+ _jstringId = reinterpret_cast<jstring>(env->NewGlobalRef(env->GetObjectField(_jobjectTaskMessage, jidTaskId)));
+ ManagedLog::LOGGER->LogStop("TaskMessageClr2Java::TaskMessageClr2Java");
+ }
+
+ void TaskMessageClr2Java::OnError(String^ message) {
+ ManagedLog::LOGGER->Log("TaskMessageClr2Java::OnError");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ HandleClr2JavaError(env, message, _jobjectTaskMessage);
+ }
+
+ String^ TaskMessageClr2Java::GetId() {
+ ManagedLog::LOGGER->Log("TaskMessageClr2Java::GetId");
+ JNIEnv *env = RetrieveEnv(_jvm);
+ return ManagedStringFromJavaString(env, _jstringId);
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar
----------------------------------------------------------------------
diff --git a/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar b/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar
index e43d8bf..9f25ae3 100644
Binary files a/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar and b/lang/cs/Tests/ReefTests/bin/reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar differ
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/pom.xml b/lang/java/reef-bridge-java/pom.xml
new file mode 100644
index 0000000..e93274d
--- /dev/null
+++ b/lang/java/reef-bridge-java/pom.xml
@@ -0,0 +1,116 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>reef-bridge-java</artifactId>
+ <name>REEF Bridge Java</name>
+ <description>Bridge between JVM and CLR.</description>
+
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ <relativePath>../../..</relativePath>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-yarn</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-io</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-checkpoint</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-webserver</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <addClasspath>false</addClasspath>
+ <classpathPrefix>lib/</classpathPrefix>
+ <mainClass>org.apache.reef.javabridge.JavaBridge</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>process-classes</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <exportAntProperties>true</exportAntProperties>
+ <target>
+ <property name="runtime_classpath" refid="maven.compile.classpath"/>
+ <exec executable="javah">
+ <arg value="-cp"/>
+ <arg value="${runtime_classpath}"/>
+ <arg value="-d"/>
+ <arg value="${project.build.directory}/classes"/>
+ <arg value="org.apache.reef.javabridge.NativeInterop"/>
+ </exec>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
new file mode 100644
index 0000000..a0dedf5
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.io.naming.Identifiable;
+import org.apache.reef.tang.ClassHierarchy;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ActiveContextBridge extends NativeBridge implements Identifiable {
+ private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName());
+
+ private ActiveContext jactiveContext;
+
+ private AvroConfigurationSerializer serializer;
+
+ private String contextId;
+
+ private String evaluatorId;
+
+ public ActiveContextBridge(ActiveContext activeContext) {
+ jactiveContext = activeContext;
+ serializer = new AvroConfigurationSerializer();
+ contextId = activeContext.getId();
+ evaluatorId = activeContext.getEvaluatorId();
+ }
+
+ public void submitTaskString(final String taskConfigurationString) {
+
+ if (taskConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty taskConfigurationString provided.");
+ }
+ ClassHierarchy clrClassHierarchy = Utilities.loadClassHierarchy(NativeInterop.CLASS_HIERARCHY_FILENAME);
+ Configuration taskConfiguration;
+ try {
+ taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy);
+ } catch (final Exception e) {
+ final String message = "Unable to de-serialize CLR task configurations using class hierarchy.";
+ LOG.log(Level.SEVERE, message, e);
+ throw new RuntimeException(message, e);
+ }
+ jactiveContext.submitTask(taskConfiguration);
+ }
+
+ public String getEvaluatorDescriptorSring() {
+ final String descriptorString = Utilities.getEvaluatorDescriptorString(jactiveContext.getEvaluatorDescriptor());
+ LOG.log(Level.FINE, "active context - serialized evaluator descriptor: " + descriptorString);
+ return descriptorString;
+ }
+
+ @Override
+ public void close() {
+ jactiveContext.close();
+ }
+
+ @Override
+ public String getId() {
+ return contextId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
new file mode 100644
index 0000000..5d88355
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.tang.ClassHierarchy;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class AllocatedEvaluatorBridge extends NativeBridge {
+
+ private static final Logger LOG = Logger.getLogger(AllocatedEvaluatorBridge.class.getName());
+
+ private final AllocatedEvaluator jallocatedEvaluator;
+ private final AvroConfigurationSerializer serializer;
+ private final ClassHierarchy clrClassHierarchy;
+ private final String evaluatorId;
+ private final String nameServerInfo;
+
+ public AllocatedEvaluatorBridge(final AllocatedEvaluator allocatedEvaluator, final String serverInfo) {
+ jallocatedEvaluator = allocatedEvaluator;
+ serializer = new AvroConfigurationSerializer();
+ clrClassHierarchy = Utilities.loadClassHierarchy(NativeInterop.CLASS_HIERARCHY_FILENAME);
+ evaluatorId = allocatedEvaluator.getId();
+ nameServerInfo = serverInfo;
+ }
+
+ public void submitContextAndTaskString(final String contextConfigurationString, final String taskConfigurationString) {
+ if (contextConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty contextConfigurationString provided.");
+ }
+ if (taskConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty taskConfigurationString provided.");
+ }
+ Configuration contextConfiguration;
+ Configuration taskConfiguration;
+ try {
+ contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy);
+ taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy);
+ } catch (final Exception e) {
+ final String message = "Unable to de-serialize CLR context or task configurations using class hierarchy.";
+ LOG.log(Level.SEVERE, message, e);
+ throw new RuntimeException(message, e);
+ }
+ jallocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration);
+ }
+
+ public void submitContextString(final String contextConfigurationString) {
+ if (contextConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty contextConfigurationString provided.");
+ }
+ Configuration contextConfiguration;
+ try {
+ contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy);
+ } catch (final Exception e) {
+ final String message = "Unable to de-serialize CLR context configurations using class hierarchy.";
+ LOG.log(Level.SEVERE, message, e);
+ throw new RuntimeException(message, e);
+ }
+ jallocatedEvaluator.submitContext(contextConfiguration);
+ }
+
+ public void submitContextAndServiceString(final String contextConfigurationString, final String serviceConfigurationString) {
+ if (contextConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty contextConfigurationString provided.");
+ }
+ if (serviceConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty serviceConfigurationString provided.");
+ }
+
+ Configuration contextConfiguration;
+ Configuration servicetConfiguration;
+ try {
+ contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy);
+ servicetConfiguration = serializer.fromString(serviceConfigurationString, clrClassHierarchy);
+ } catch (final Exception e) {
+ final String message = "Unable to de-serialize CLR context or service configurations using class hierarchy.";
+ LOG.log(Level.SEVERE, message, e);
+ throw new RuntimeException(message, e);
+ }
+ jallocatedEvaluator.submitContextAndService(contextConfiguration, servicetConfiguration);
+ }
+
+ public void submitContextAndServiceAndTaskString(
+ final String contextConfigurationString,
+ final String serviceConfigurationString,
+ final String taskConfigurationString) {
+ if (contextConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty contextConfigurationString provided.");
+ }
+ if (serviceConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty serviceConfigurationString provided.");
+ }
+ if (taskConfigurationString.isEmpty()) {
+ throw new RuntimeException("empty taskConfigurationString provided.");
+ }
+ Configuration contextConfiguration;
+ Configuration servicetConfiguration;
+ Configuration taskConfiguration;
+ try {
+ contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy);
+ servicetConfiguration = serializer.fromString(serviceConfigurationString, clrClassHierarchy);
+ taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy);
+ } catch (final Exception e) {
+ final String message = "Unable to de-serialize CLR context or service or task configurations using class hierarchy.";
+ LOG.log(Level.SEVERE, message, e);
+ throw new RuntimeException(message, e);
+ }
+ jallocatedEvaluator.submitContextAndServiceAndTask(contextConfiguration, servicetConfiguration, taskConfiguration);
+ }
+
+ public String getEvaluatorDescriptorSring() {
+ String descriptorString = Utilities.getEvaluatorDescriptorString(jallocatedEvaluator.getEvaluatorDescriptor());
+ LOG.log(Level.INFO, "allocated evaluator - serialized evaluator descriptor: " + descriptorString);
+ return descriptorString;
+ }
+
+ @Override
+ public void close() {
+ jallocatedEvaluator.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
new file mode 100644
index 0000000..62f9ce7
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ClosedContextBridge extends NativeBridge implements ClosedContext {
+
+ private static final Logger LOG = Logger.getLogger(ClosedContextBridge.class.getName());
+
+ private final ClosedContext jcloseContext;
+ private final ActiveContextBridge parentContext;
+ private final String contextId;
+ private final String evaluatorId;
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ public ClosedContextBridge(final ClosedContext closedContext) {
+ jcloseContext = closedContext;
+ parentContext = new ActiveContextBridge(closedContext.getParentContext());
+ contextId = closedContext.getId();
+ evaluatorId = closedContext.getEvaluatorId();
+ evaluatorDescriptor = closedContext.getEvaluatorDescriptor();
+ }
+
+ @Override
+ public String getId() {
+ return contextId;
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return evaluatorId;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return Optional.of(parentContext.getId());
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return evaluatorDescriptor;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ public String getEvaluatorDescriptorSring() {
+ String descriptorString = Utilities.getEvaluatorDescriptorString(evaluatorDescriptor);
+ LOG.log(Level.INFO, "Closed Context - serialized evaluator descriptor: " + descriptorString);
+ return descriptorString;
+ }
+
+ @Override
+ public ActiveContext getParentContext() {
+ return jcloseContext.getParentContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedEvaluatorBridge.java
new file mode 100644
index 0000000..0e300fd
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedEvaluatorBridge.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.io.naming.Identifiable;
+
+public class CompletedEvaluatorBridge extends NativeBridge implements Identifiable {
+
+ private final CompletedEvaluator jcompletedEvaluator;
+
+ private final String evaluatorId;
+
+ public CompletedEvaluatorBridge(CompletedEvaluator completedEvaluator) {
+ jcompletedEvaluator = completedEvaluator;
+ evaluatorId = completedEvaluator.getId();
+ }
+
+ @Override
+ public String getId() {
+ return evaluatorId;
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
new file mode 100644
index 0000000..c95ca14
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.task.CompletedTask;
+
+public class CompletedTaskBridge extends NativeBridge {
+
+ private CompletedTask jcompletedTask;
+
+ private String taskId;
+
+ private ActiveContextBridge jactiveContext;
+
+ public CompletedTaskBridge(CompletedTask completedTask) {
+ jcompletedTask = completedTask;
+ taskId = completedTask.getId();
+ jactiveContext = new ActiveContextBridge(completedTask.getActiveContext());
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java
new file mode 100644
index 0000000..eca4ba8
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ContextMessageBridge.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ContextMessage;
+
+public class ContextMessageBridge extends NativeBridge implements ContextMessage {
+
+ private ContextMessage jcontextMessage;
+ private String contextMessageId;
+ private String messageSourceId;
+ private byte[] message;
+
+ public ContextMessageBridge(ContextMessage contextMessage) {
+ jcontextMessage = contextMessage;
+ contextMessageId = contextMessage.getId();
+ messageSourceId = contextMessage.getMessageSourceID();
+ message = contextMessage.get();
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ @Override
+ public byte[] get() {
+ return message;
+ }
+
+ @Override
+ public String getId() {
+ return contextMessageId;
+ }
+
+ @Override
+ public String getMessageSourceID() {
+ return messageSourceId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
new file mode 100644
index 0000000..a712fc4
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class EvaluatorRequestorBridge extends NativeBridge {
+ private static final Logger LOG = Logger.getLogger(EvaluatorRequestorBridge.class.getName());
+ private final boolean isBlocked;
+ private final EvaluatorRequestor jevaluatorRequestor;
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ // accumulate how many evaluators have been submitted through this instance
+ // of EvaluatorRequestorBridge
+ private int clrEvaluatorsNumber;
+
+ public EvaluatorRequestorBridge(final EvaluatorRequestor evaluatorRequestor, final boolean isBlocked, final LoggingScopeFactory loggingScopeFactory) {
+ this.jevaluatorRequestor = evaluatorRequestor;
+ this.clrEvaluatorsNumber = 0;
+ this.isBlocked = isBlocked;
+ this.loggingScopeFactory = loggingScopeFactory;
+ }
+
+ public void submit(final int evaluatorsNumber, final int memory, final int virtualCore, final String rack) {
+ if (this.isBlocked) {
+ throw new RuntimeException("Cannot request additional Evaluator, this is probably because the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433.");
+ }
+
+ if (rack != null && !rack.isEmpty()) {
+ LOG.log(Level.WARNING, "Ignoring rack preference.");
+ }
+
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorRequestSubmitToJavaDriver(evaluatorsNumber)) {
+ clrEvaluatorsNumber += evaluatorsNumber;
+
+ final EvaluatorRequest request = EvaluatorRequest.newBuilder()
+ .setNumber(evaluatorsNumber)
+ .setMemory(memory)
+ .setNumberOfCores(virtualCore)
+ .build();
+
+ LOG.log(Level.FINE, "submitting evaluator request {0}", request);
+ jevaluatorRequestor.submit(request);
+ }
+ }
+
+ public int getEvaluatorNumber() {
+ return clrEvaluatorsNumber;
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
new file mode 100644
index 0000000..dfed7f7
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ContextBase;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class FailedContextBridge extends NativeBridge implements ContextBase {
+
+ private static final Logger LOG = Logger.getLogger(FailedContextBridge.class.getName());
+
+ private final ActiveContextBridge parentContext;
+ private final EvaluatorDescriptor evaluatorDescriptor;
+ private final String evaluatorId;
+ private final String contextId;
+ private final String parentContextId;
+ private final FailedContext jfailedContext;
+
+ public FailedContextBridge(final FailedContext failedContext) {
+ jfailedContext = failedContext;
+ evaluatorDescriptor = failedContext.getEvaluatorDescriptor();
+ evaluatorId = failedContext.getEvaluatorId();
+ contextId = failedContext.getId();
+ parentContext = failedContext.getParentContext().isPresent() ?
+ new ActiveContextBridge(failedContext.getParentContext().get()) : null;
+ parentContextId = parentContext != null ? parentContext.getId() : null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public String getId() {
+ return contextId;
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return evaluatorId;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ if (parentContextId != null) {
+ return Optional.of(parentContextId);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return evaluatorDescriptor;
+ }
+
+ public String getEvaluatorDescriptorSring() {
+ String descriptorString = Utilities.getEvaluatorDescriptorString(evaluatorDescriptor);
+ LOG.log(Level.INFO, "Failed Context - serialized evaluator descriptor: " + descriptorString);
+ return descriptorString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
new file mode 100644
index 0000000..bae4946
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import java.util.logging.Logger;
+
+public class FailedEvaluatorBridge extends NativeBridge {
+ private static final Logger LOG = Logger.getLogger(FailedEvaluatorBridge.class.getName());
+ private FailedEvaluator jfailedEvaluator;
+ private EvaluatorRequestorBridge evaluatorRequestorBridge;
+ private String evaluatorId;
+
+ public FailedEvaluatorBridge(FailedEvaluator failedEvaluator, EvaluatorRequestor evaluatorRequestor, boolean blockedForAdditionalEvaluator, final LoggingScopeFactory loggingScopeFactory) {
+ jfailedEvaluator = failedEvaluator;
+ evaluatorId = failedEvaluator.getId();
+ evaluatorRequestorBridge = new EvaluatorRequestorBridge(evaluatorRequestor, blockedForAdditionalEvaluator, loggingScopeFactory);
+ }
+
+ public int getNewlyRequestedEvaluatorNumber() {
+ return evaluatorRequestorBridge.getEvaluatorNumber();
+ }
+
+ @Override
+ public void close() {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
new file mode 100644
index 0000000..30383ca
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class FailedTaskBridge extends NativeBridge {
+ private static final Logger LOG = Logger.getLogger(FailedTaskBridge.class.getName());
+
+ private FailedTask jfailedTask;
+ private ActiveContextBridge jactiveContext;
+
+ public FailedTaskBridge(FailedTask failedTask) {
+ jfailedTask = failedTask;
+ Optional<ActiveContext> activeContext = failedTask.getActiveContext();
+ jactiveContext = activeContext.isPresent() ? new ActiveContextBridge(activeContext.get()) : null;
+ }
+
+ public String getFailedTaskString() {
+ final String description = jfailedTask.getDescription().isPresent() ? jfailedTask.getDescription().get().replace("=", "").replace(",", "") : "";
+ final String cause = jfailedTask.getReason().isPresent() ? jfailedTask.getReason().get().toString().replace("=", "").replace(",", "") : "";
+ final String data = jfailedTask.getData().isPresent() ? new String(jfailedTask.getData().get()).replace("=", "").replace(",", "") : "";
+
+ // TODO: deserialize/serialize with proper Avro schema
+ final String poorSerializedString = "Identifier=" + jfailedTask.getId().replace("=", "").replace(",", "")
+ + ", Message=" + jfailedTask.getMessage().replace("=", "").replace(",", "")
+ + ", Description=" + description
+ + ", Cause=" + cause
+ + ", Data=" + data;
+
+ LOG.log(Level.INFO, "serialized failed task " + poorSerializedString);
+ return poorSerializedString;
+ }
+
+ @Override
+ public void close() {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java
new file mode 100644
index 0000000..3e8a4e5
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+public final class HttpServerEventBridge extends NativeBridge {
+ private String queryString;
+ private byte[] queryRequestData;
+ private byte[] queryResponseData;
+ private String queryResult;
+ private String uriSpecification;
+
+ public HttpServerEventBridge(final String queryStr) {
+ this.queryString = queryStr;
+ }
+
+ public HttpServerEventBridge(final byte[] queryRequestData) {
+ this.queryRequestData = queryRequestData;
+ }
+
+ public final String getQueryString() {
+ return queryString;
+ }
+
+ public final void setQueryString(final String queryStr) {
+ this.queryString = queryStr;
+ }
+
+ public final String getQueryResult() {
+ return queryResult;
+ }
+
+ public final void setQueryResult(final String queryResult) {
+ this.queryResult = queryResult;
+ }
+
+ public final String getUriSpecification() {
+ return uriSpecification;
+ }
+
+ public final void setUriSpecification(final String uriSpecification) {
+ this.uriSpecification = uriSpecification;
+ }
+
+ public final byte[] getQueryRequestData() {
+ return queryRequestData;
+ }
+
+ public final void setQueryRequestData(final byte[] queryRequestData) {
+ this.queryRequestData = queryRequestData;
+ }
+
+ public final byte[] getQueryResponseData() {
+ return queryResponseData;
+ }
+
+ public final void setQueryResponseData(final byte[] responseData) {
+ queryResponseData = responseData;
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java
new file mode 100644
index 0000000..8bfbdfa
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import java.util.HashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class InteropLogger {
+ private static final Logger LOG = Logger.getLogger("InteropLogger");
+ HashMap<Integer, Level> levelHashMap;
+
+ {
+ levelHashMap = new HashMap<Integer, Level>();
+ levelHashMap.put(Level.OFF.intValue(), Level.OFF);
+ levelHashMap.put(Level.SEVERE.intValue(), Level.SEVERE);
+ levelHashMap.put(Level.WARNING.intValue(), Level.WARNING);
+ levelHashMap.put(Level.INFO.intValue(), Level.INFO);
+
+ levelHashMap.put(Level.CONFIG.intValue(), Level.CONFIG);
+ levelHashMap.put(Level.FINE.intValue(), Level.FINE);
+ levelHashMap.put(Level.FINER.intValue(), Level.FINER);
+
+ levelHashMap.put(Level.FINEST.intValue(), Level.FINEST);
+ levelHashMap.put(Level.ALL.intValue(), Level.ALL);
+ }
+
+ public void Log(int intLevel, String message) {
+ if (levelHashMap.containsKey(intLevel)) {
+ Level level = levelHashMap.get(intLevel);
+ LOG.log(level, message);
+ } else {
+
+ LOG.log(Level.WARNING, "Level " + intLevel + " is not a valid Log level");
+ LOG.log(Level.WARNING, message);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java
new file mode 100644
index 0000000..8ef59d6
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import java.util.ArrayList;
+
+public class InteropReturnInfo {
+
+ int returnCode;
+ ArrayList<String> exceptionList = new ArrayList<String>();
+
+ public void addExceptionString(String exceptionString) {
+ exceptionList.add(exceptionString);
+ }
+
+ public boolean hasExceptions() {
+ return !exceptionList.isEmpty();
+ }
+
+ public ArrayList<String> getExceptionList() {
+ return exceptionList;
+ }
+
+ public int getReturnCode() {
+ return returnCode;
+ }
+
+ public void setReturnCode(int rc) {
+ returnCode = rc;
+ }
+
+ public void reset() {
+ exceptionList = new ArrayList<String>();
+ returnCode = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java
new file mode 100644
index 0000000..ba438d8
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+public class JavaBridge {
+ private final static String CPP_BRIDGE = "JavaClrBridge";
+
+ static {
+ try {
+ System.loadLibrary(CPP_BRIDGE);
+ } catch (UnsatisfiedLinkError e) {
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
new file mode 100644
index 0000000..fa8b459
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.javabridge;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.Date;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Loading CLR libraries
+ */
+public class LibLoader {
+
+ private static final Logger LOG = Logger.getLogger(LibLoader.class.getName());
+
+ private static final String LIB_BIN = "/";
+ private static final String DLL_EXTENSION = ".dll";
+ private static final String USER_DIR = "user.dir";
+ private static final String[] MANAGED_DLLS = {
+ "ClrHandler",
+ "msvcr110",
+ };
+
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ private final REEFFileNames reefFileNames;
+
+ @Inject
+ private LibLoader(final LoggingScopeFactory loggingScopeFactory, final REEFFileNames reefFileNames) {
+ this.loggingScopeFactory = loggingScopeFactory;
+ this.reefFileNames = reefFileNames;
+ }
+
+ /**
+ * Load CLR libraries
+ */
+ public void loadLib() throws IOException {
+ LOG.log(Level.INFO, "Loading DLLs for driver at time {0}." + new Date().toString());
+ try (final LoggingScope lb = loggingScopeFactory.loadLib()) {
+ final String tempLoadDir = System.getProperty(USER_DIR) + this.reefFileNames.getLoadDir();
+ LOG.log(Level.INFO, "load Folder: " + tempLoadDir);
+ new File(tempLoadDir).mkdir();
+
+ loadFromReefJar(this.reefFileNames.getCppBridge(), false);
+
+ loadLibFromGlobal();
+
+ for (int i = 0; i < MANAGED_DLLS.length; i++) {
+ loadFromReefJar(MANAGED_DLLS[i], true);
+ }
+ }
+ LOG.log(Level.INFO, "Done loading DLLs for Driver at time {0}." + new Date().toString());
+ }
+
+ /**
+ * Load assemblies at global folder
+ */
+ private void loadLibFromGlobal() {
+ final String globalFilePath = System.getProperty(USER_DIR) + this.reefFileNames.getReefGlobal();
+ final File[] files = new File(globalFilePath).listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(DLL_EXTENSION);
+ }
+ });
+
+ LOG.log(Level.INFO, "Total dll files to load from {0} is {1}.", new Object[] {globalFilePath, files.length} );
+ for (int i = 0; i < files.length; i++) {
+ try {
+ LOG.log(Level.INFO, "file to load : " + files[i].toString());
+ NativeInterop.loadClrAssembly(files[i].toString());
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "exception in loading dll library: ", files[i].toString());
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Get file from jar file and copy it to temp dir and loads the library to memory
+ **/
+ private void loadFromReefJar(String name, final boolean managed) throws IOException {
+
+ name = name + DLL_EXTENSION;
+ try {
+ File fileOut = null;
+ // get input file from jar
+ final String path = this.reefFileNames.getReefDriverAppDllDir() + name;
+ LOG.log(Level.INFO, "Source file path: " + path);
+ final java.net.URL url = NativeInterop.class.getClass().getResource(path);
+ if (url != null) {
+ LOG.log(Level.INFO, "Source file: " + url.getPath());
+ }
+ try (final InputStream in = NativeInterop.class.getResourceAsStream(path)) {
+ //copy to /reef/CLRLoadingDirectory
+ final String tempLoadDir = System.getProperty(USER_DIR) + this.reefFileNames.getLoadDir();
+ fileOut = new File(tempLoadDir + LIB_BIN + name);
+ LOG.log(Level.INFO, "Destination file: " + fileOut.toString());
+ if (null == in) {
+ LOG.log(Level.WARNING, "Cannot find " + path);
+ return;
+ }
+ try (final OutputStream out = new FileOutputStream(fileOut) ) {
+ IOUtils.copy(in, out);
+ }
+ }
+ loadAssembly(fileOut, managed);
+ } catch (final FileNotFoundException e) {
+ LOG.log(Level.SEVERE, "File not find exception: ", name);
+ throw e;
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "File copy error: ", name);
+ throw e;
+ }
+ }
+
+ /**
+ * load assembly
+ * @param fileOut
+ * @param managed
+ */
+ private void loadAssembly(final File fileOut, final boolean managed) {
+ if (managed) {
+ NativeInterop.loadClrAssembly(fileOut.toString());
+ LOG.log(Level.INFO, "Loading DLL managed done");
+ } else {
+ System.load(fileOut.toString());
+ LOG.log(Level.INFO, "Loading DLL not managed done");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java
new file mode 100644
index 0000000..4249ba7
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public abstract class NativeBridge implements AutoCloseable {
+
+ private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName());
+
+ public void onError(String errorMessage) {
+ LOG.log(Level.SEVERE, "Bridge received error from CLR: " + errorMessage);
+ throw new RuntimeException("Bridge received error from CLR: " + errorMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
new file mode 100644
index 0000000..9fe61c1
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import java.util.HashMap;
+
+public class NativeInterop {
+ public static final String CLASS_HIERARCHY_FILENAME = "clrClassHierarchy.bin";
+ public static final String GLOBAL_LIBRARIES_FILENAME = "userSuppliedGlobalLibraries.txt";
+ public static final String EvaluatorRequestorKey = "EvaluatorRequestor";
+ public static final String AllocatedEvaluatorKey = "AllocatedEvaluator";
+ public static final String ActiveContextKey = "ActiveContext";
+ public static final String TaskMessageKey = "TaskMessage";
+ public static final String FailedTaskKey = "FailedTask";
+ public static final String FailedEvaluatorKey = "FailedEvaluator";
+ public static final String HttpServerKey = "HttpServerKey";
+ public static final String CompletedTaskKey = "CompletedTask";
+ public static final String RunningTaskKey = "RunningTask";
+ public static final String SuspendedTaskKey = "SuspendedTask";
+ public static final String CompletedEvaluatorKey = "CompletedEvaluator";
+ public static final String ClosedContextKey = "ClosedContext";
+ public static final String FailedContextKey = "FailedContext";
+ public static final String ContextMessageKey = "ContextMessage";
+ public static final String DriverRestartKey = "DriverRestart";
+ public static final String DriverRestartActiveContextKey = "DriverRestartActiveContext";
+ public static final String DriverRestartRunningTaskKey = "DriverRestartRunningTask";
+ public static final HashMap<String, Integer> Handlers = new HashMap<String, Integer>() {
+ {
+ put(EvaluatorRequestorKey, 0);
+ put(AllocatedEvaluatorKey, 1);
+ put(ActiveContextKey, 2);
+ put(TaskMessageKey, 3);
+ put(FailedTaskKey, 4);
+ put(FailedEvaluatorKey, 5);
+ put(HttpServerKey, 6);
+ put(CompletedTaskKey, 7);
+ put(RunningTaskKey, 8);
+ put(SuspendedTaskKey, 9);
+ put(CompletedEvaluatorKey, 10);
+ put(ClosedContextKey, 11);
+ put(FailedContextKey, 12);
+ put(ContextMessageKey, 13);
+ put(DriverRestartKey, 14);
+ put(DriverRestartActiveContextKey, 15);
+ put(DriverRestartRunningTaskKey, 16);
+ }
+ };
+
+ public static final int nHandlers = 17;
+
+ public static native void loadClrAssembly(String filePath);
+
+ public static native void ClrBufferedLog(int level, String message);
+
+ public static native long[] CallClrSystemOnStartHandler(String dateTime, String httpServerPortNumber);
+
+ public static native void ClrSystemAllocatedEvaluatorHandlerOnNext(
+ long handle,
+ AllocatedEvaluatorBridge javaEvaluatorBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemActiveContextHandlerOnNext(
+ long handle,
+ ActiveContextBridge javaActiveContextBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemEvaluatorRequstorHandlerOnNext(
+ long handle,
+ EvaluatorRequestorBridge javaEvluatorRequstorBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemTaskMessageHandlerOnNext(
+ long handle,
+ byte[] mesage,
+ TaskMessageBridge javaTaskMessageBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemFailedTaskHandlerOnNext(
+ long handle,
+ FailedTaskBridge failedTaskBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemHttpServerHandlerOnNext(
+ long handle,
+ HttpServerEventBridge httpServerEventBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemFailedEvaluatorHandlerOnNext(
+ long handle,
+ FailedEvaluatorBridge failedEvaluatorBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemCompletedTaskHandlerOnNext(
+ long handle,
+ CompletedTaskBridge completedTaskBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemRunningTaskHandlerOnNext(
+ long handle,
+ RunningTaskBridge runningTaskBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemSupendedTaskHandlerOnNext(
+ long handle,
+ SuspendedTaskBridge suspendedTaskBridge
+ );
+
+ public static native void ClrSystemCompletdEvaluatorHandlerOnNext(
+ long handle,
+ CompletedEvaluatorBridge completedEvaluatorBridge
+ );
+
+ public static native void ClrSystemClosedContextHandlerOnNext(
+ long handle,
+ ClosedContextBridge closedContextBridge
+ );
+
+ public static native void ClrSystemFailedContextHandlerOnNext(
+ long handle,
+ FailedContextBridge failedContextBridge
+ );
+
+ public static native void ClrSystemContextMessageHandlerOnNext(
+ long handle,
+ ContextMessageBridge contextMessageBridge
+ );
+
+ public static native void ClrSystemDriverRestartHandlerOnNext(
+ long handle
+ );
+
+ public static native void ClrSystemDriverRestartActiveContextHandlerOnNext(
+ long handle,
+ ActiveContextBridge activeContextBridge
+ );
+
+ public static native void ClrSystemDriverRestartRunningTaskHandlerOnNext(
+ long handle,
+ RunningTaskBridge runningTaskBridge
+ );
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
new file mode 100644
index 0000000..301c4fc
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.RunningTask;
+
+import java.util.logging.Logger;
+
+public class RunningTaskBridge extends NativeBridge {
+ private static final Logger LOG = Logger.getLogger(RunningTaskBridge.class.getName());
+
+ final private RunningTask jrunningTask;
+ final private ActiveContextBridge jactiveContext;
+
+ public RunningTaskBridge(RunningTask runningTask) {
+ jrunningTask = runningTask;
+ final ActiveContext activeContext = runningTask.getActiveContext();
+ jactiveContext = new ActiveContextBridge(activeContext);
+ }
+
+ public final String getId() {
+ return jrunningTask.getId();
+ }
+
+ public final void send(final byte[] message) {
+ jrunningTask.send(message);
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
new file mode 100644
index 0000000..16fa3d3
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.task.SuspendedTask;
+import org.apache.reef.io.Message;
+import org.apache.reef.io.naming.Identifiable;
+
+public class SuspendedTaskBridge extends NativeBridge implements Identifiable, Message {
+
+ private final SuspendedTask jsuspendedTask;
+ private final String taskId;
+ private final ActiveContextBridge jactiveContext;
+
+ public SuspendedTaskBridge(SuspendedTask suspendedTask) {
+ jsuspendedTask = suspendedTask;
+ taskId = suspendedTask.getId();
+ jactiveContext = new ActiveContextBridge(jsuspendedTask.getActiveContext());
+ }
+
+ public ActiveContextBridge getActiveContext() {
+ return jactiveContext;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public String getId() {
+ return taskId;
+ }
+
+ @Override
+ public byte[] get() {
+ return jsuspendedTask.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
new file mode 100644
index 0000000..25b0478
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.task.TaskMessage;
+
+public class TaskMessageBridge extends NativeBridge {
+ private TaskMessage jtaskMessage;
+ private String taskId;
+
+ // we don't really need to pass this around, just have this as place holder for future.
+ public TaskMessageBridge(TaskMessage taskMessage) {
+ jtaskMessage = taskMessage;
+ taskId = taskMessage.getId();
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/22f651f8/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java
new file mode 100644
index 0000000..e6d0849
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.tang.ClassHierarchy;
+import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy;
+import org.apache.reef.tang.proto.ClassHierarchyProto;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class Utilities {
+ public static ClassHierarchy loadClassHierarchy(String classHierarchyFile) {
+ Path p = Paths.get(classHierarchyFile);
+ if (!Files.exists(p)) {
+ p = Paths.get(System.getProperty("user.dir") + "/reef/global/" + classHierarchyFile);
+ }
+ if (!Files.exists(p)) {
+ throw new RuntimeException("cannot find file " + p.toAbsolutePath());
+ }
+ try (final InputStream chin = new FileInputStream(p.toAbsolutePath().toString())) {
+ final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin);
+ final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root);
+ return ch;
+ } catch (final IOException e) {
+ final String message = "Unable to load class hierarchy from " + classHierarchyFile;
+ throw new RuntimeException(message, e);
+ }
+ }
+
+ public static String getEvaluatorDescriptorString(EvaluatorDescriptor evaluatorDescriptor) {
+ InetSocketAddress socketAddress = evaluatorDescriptor.getNodeDescriptor().getInetSocketAddress();
+ return "IP=" + socketAddress.getAddress() + ", Port=" + socketAddress.getPort() + ", HostName=" + socketAddress.getHostName() + ", Memory=" + evaluatorDescriptor.getMemory() + ", Core=" + evaluatorDescriptor.getNumberOfCores();
+ }
+}