You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:20 UTC
[47/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
new file mode 100644
index 0000000..a712fc4
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class EvaluatorRequestorBridge extends NativeBridge {
+ private static final Logger LOG = Logger.getLogger(EvaluatorRequestorBridge.class.getName());
+ private final boolean isBlocked;
+ private final EvaluatorRequestor jevaluatorRequestor;
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ // accumulate how many evaluators have been submitted through this instance
+ // of EvaluatorRequestorBridge
+ private int clrEvaluatorsNumber;
+
+ public EvaluatorRequestorBridge(final EvaluatorRequestor evaluatorRequestor, final boolean isBlocked, final LoggingScopeFactory loggingScopeFactory) {
+ this.jevaluatorRequestor = evaluatorRequestor;
+ this.clrEvaluatorsNumber = 0;
+ this.isBlocked = isBlocked;
+ this.loggingScopeFactory = loggingScopeFactory;
+ }
+
+ public void submit(final int evaluatorsNumber, final int memory, final int virtualCore, final String rack) {
+ if (this.isBlocked) {
+ throw new RuntimeException("Cannot request additional Evaluator, this is probably because the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433.");
+ }
+
+ if (rack != null && !rack.isEmpty()) {
+ LOG.log(Level.WARNING, "Ignoring rack preference.");
+ }
+
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorRequestSubmitToJavaDriver(evaluatorsNumber)) {
+ clrEvaluatorsNumber += evaluatorsNumber;
+
+ final EvaluatorRequest request = EvaluatorRequest.newBuilder()
+ .setNumber(evaluatorsNumber)
+ .setMemory(memory)
+ .setNumberOfCores(virtualCore)
+ .build();
+
+ LOG.log(Level.FINE, "submitting evaluator request {0}", request);
+ jevaluatorRequestor.submit(request);
+ }
+ }
+
+ public int getEvaluatorNumber() {
+ return clrEvaluatorsNumber;
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
new file mode 100644
index 0000000..dfed7f7
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ContextBase;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class FailedContextBridge extends NativeBridge implements ContextBase {
+
+ private static final Logger LOG = Logger.getLogger(FailedContextBridge.class.getName());
+
+ private final ActiveContextBridge parentContext;
+ private final EvaluatorDescriptor evaluatorDescriptor;
+ private final String evaluatorId;
+ private final String contextId;
+ private final String parentContextId;
+ private final FailedContext jfailedContext;
+
+ public FailedContextBridge(final FailedContext failedContext) {
+ jfailedContext = failedContext;
+ evaluatorDescriptor = failedContext.getEvaluatorDescriptor();
+ evaluatorId = failedContext.getEvaluatorId();
+ contextId = failedContext.getId();
+ parentContext = failedContext.getParentContext().isPresent() ?
+ new ActiveContextBridge(failedContext.getParentContext().get()) : null;
+ parentContextId = parentContext != null ? parentContext.getId() : null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public String getId() {
+ return contextId;
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return evaluatorId;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ if (parentContextId != null) {
+ return Optional.of(parentContextId);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return evaluatorDescriptor;
+ }
+
+ public String getEvaluatorDescriptorSring() {
+ String descriptorString = Utilities.getEvaluatorDescriptorString(evaluatorDescriptor);
+ LOG.log(Level.INFO, "Failed Context - serialized evaluator descriptor: " + descriptorString);
+ return descriptorString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
new file mode 100644
index 0000000..bae4946
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import java.util.logging.Logger;
+
+public class FailedEvaluatorBridge extends NativeBridge {
+ private static final Logger LOG = Logger.getLogger(FailedEvaluatorBridge.class.getName());
+ private FailedEvaluator jfailedEvaluator;
+ private EvaluatorRequestorBridge evaluatorRequestorBridge;
+ private String evaluatorId;
+
+ public FailedEvaluatorBridge(FailedEvaluator failedEvaluator, EvaluatorRequestor evaluatorRequestor, boolean blockedForAdditionalEvaluator, final LoggingScopeFactory loggingScopeFactory) {
+ jfailedEvaluator = failedEvaluator;
+ evaluatorId = failedEvaluator.getId();
+ evaluatorRequestorBridge = new EvaluatorRequestorBridge(evaluatorRequestor, blockedForAdditionalEvaluator, loggingScopeFactory);
+ }
+
+ public int getNewlyRequestedEvaluatorNumber() {
+ return evaluatorRequestorBridge.getEvaluatorNumber();
+ }
+
+ @Override
+ public void close() {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
new file mode 100644
index 0000000..30383ca
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class FailedTaskBridge extends NativeBridge {
+ private static final Logger LOG = Logger.getLogger(FailedTaskBridge.class.getName());
+
+ private FailedTask jfailedTask;
+ private ActiveContextBridge jactiveContext;
+
+ public FailedTaskBridge(FailedTask failedTask) {
+ jfailedTask = failedTask;
+ Optional<ActiveContext> activeContext = failedTask.getActiveContext();
+ jactiveContext = activeContext.isPresent() ? new ActiveContextBridge(activeContext.get()) : null;
+ }
+
+ public String getFailedTaskString() {
+ final String description = jfailedTask.getDescription().isPresent() ? jfailedTask.getDescription().get().replace("=", "").replace(",", "") : "";
+ final String cause = jfailedTask.getReason().isPresent() ? jfailedTask.getReason().get().toString().replace("=", "").replace(",", "") : "";
+ final String data = jfailedTask.getData().isPresent() ? new String(jfailedTask.getData().get()).replace("=", "").replace(",", "") : "";
+
+ // TODO: deserialize/serialize with proper Avro schema
+ final String poorSerializedString = "Identifier=" + jfailedTask.getId().replace("=", "").replace(",", "")
+ + ", Message=" + jfailedTask.getMessage().replace("=", "").replace(",", "")
+ + ", Description=" + description
+ + ", Cause=" + cause
+ + ", Data=" + data;
+
+ LOG.log(Level.INFO, "serialized failed task " + poorSerializedString);
+ return poorSerializedString;
+ }
+
+ @Override
+ public void close() {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java
new file mode 100644
index 0000000..3e8a4e5
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/HttpServerEventBridge.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+public final class HttpServerEventBridge extends NativeBridge {
+ private String queryString;
+ private byte[] queryRequestData;
+ private byte[] queryResponseData;
+ private String queryResult;
+ private String uriSpecification;
+
+ public HttpServerEventBridge(final String queryStr) {
+ this.queryString = queryStr;
+ }
+
+ public HttpServerEventBridge(final byte[] queryRequestData) {
+ this.queryRequestData = queryRequestData;
+ }
+
+ public final String getQueryString() {
+ return queryString;
+ }
+
+ public final void setQueryString(final String queryStr) {
+ this.queryString = queryStr;
+ }
+
+ public final String getQueryResult() {
+ return queryResult;
+ }
+
+ public final void setQueryResult(final String queryResult) {
+ this.queryResult = queryResult;
+ }
+
+ public final String getUriSpecification() {
+ return uriSpecification;
+ }
+
+ public final void setUriSpecification(final String uriSpecification) {
+ this.uriSpecification = uriSpecification;
+ }
+
+ public final byte[] getQueryRequestData() {
+ return queryRequestData;
+ }
+
+ public final void setQueryRequestData(final byte[] queryRequestData) {
+ this.queryRequestData = queryRequestData;
+ }
+
+ public final byte[] getQueryResponseData() {
+ return queryResponseData;
+ }
+
+ public final void setQueryResponseData(final byte[] responseData) {
+ queryResponseData = responseData;
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java
new file mode 100644
index 0000000..8bfbdfa
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropLogger.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import java.util.HashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class InteropLogger {
+ private static final Logger LOG = Logger.getLogger("InteropLogger");
+ HashMap<Integer, Level> levelHashMap;
+
+ {
+ levelHashMap = new HashMap<Integer, Level>();
+ levelHashMap.put(Level.OFF.intValue(), Level.OFF);
+ levelHashMap.put(Level.SEVERE.intValue(), Level.SEVERE);
+ levelHashMap.put(Level.WARNING.intValue(), Level.WARNING);
+ levelHashMap.put(Level.INFO.intValue(), Level.INFO);
+
+ levelHashMap.put(Level.CONFIG.intValue(), Level.CONFIG);
+ levelHashMap.put(Level.FINE.intValue(), Level.FINE);
+ levelHashMap.put(Level.FINER.intValue(), Level.FINER);
+
+ levelHashMap.put(Level.FINEST.intValue(), Level.FINEST);
+ levelHashMap.put(Level.ALL.intValue(), Level.ALL);
+ }
+
+ public void Log(int intLevel, String message) {
+ if (levelHashMap.containsKey(intLevel)) {
+ Level level = levelHashMap.get(intLevel);
+ LOG.log(level, message);
+ } else {
+
+ LOG.log(Level.WARNING, "Level " + intLevel + " is not a valid Log level");
+ LOG.log(Level.WARNING, message);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java
new file mode 100644
index 0000000..8ef59d6
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/InteropReturnInfo.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import java.util.ArrayList;
+
+public class InteropReturnInfo {
+
+ int returnCode;
+ ArrayList<String> exceptionList = new ArrayList<String>();
+
+ public void addExceptionString(String exceptionString) {
+ exceptionList.add(exceptionString);
+ }
+
+ public boolean hasExceptions() {
+ return !exceptionList.isEmpty();
+ }
+
+ public ArrayList<String> getExceptionList() {
+ return exceptionList;
+ }
+
+ public int getReturnCode() {
+ return returnCode;
+ }
+
+ public void setReturnCode(int rc) {
+ returnCode = rc;
+ }
+
+ public void reset() {
+ exceptionList = new ArrayList<String>();
+ returnCode = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java
new file mode 100644
index 0000000..ba438d8
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/JavaBridge.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+public class JavaBridge {
+ private final static String CPP_BRIDGE = "JavaClrBridge";
+
+ static {
+ try {
+ System.loadLibrary(CPP_BRIDGE);
+ } catch (UnsatisfiedLinkError e) {
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
new file mode 100644
index 0000000..fa8b459
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/LibLoader.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.javabridge;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.util.Date;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Loading CLR libraries
+ */
+public class LibLoader {
+
+ private static final Logger LOG = Logger.getLogger(LibLoader.class.getName());
+
+ private static final String LIB_BIN = "/";
+ private static final String DLL_EXTENSION = ".dll";
+ private static final String USER_DIR = "user.dir";
+ private static final String[] MANAGED_DLLS = {
+ "ClrHandler",
+ "msvcr110",
+ };
+
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ private final REEFFileNames reefFileNames;
+
+ @Inject
+ private LibLoader(final LoggingScopeFactory loggingScopeFactory, final REEFFileNames reefFileNames) {
+ this.loggingScopeFactory = loggingScopeFactory;
+ this.reefFileNames = reefFileNames;
+ }
+
+ /**
+ * Load CLR libraries
+ */
+ public void loadLib() throws IOException {
+ LOG.log(Level.INFO, "Loading DLLs for driver at time {0}." + new Date().toString());
+ try (final LoggingScope lb = loggingScopeFactory.loadLib()) {
+ final String tempLoadDir = System.getProperty(USER_DIR) + this.reefFileNames.getLoadDir();
+ LOG.log(Level.INFO, "load Folder: " + tempLoadDir);
+ new File(tempLoadDir).mkdir();
+
+ loadFromReefJar(this.reefFileNames.getCppBridge(), false);
+
+ loadLibFromGlobal();
+
+ for (int i = 0; i < MANAGED_DLLS.length; i++) {
+ loadFromReefJar(MANAGED_DLLS[i], true);
+ }
+ }
+ LOG.log(Level.INFO, "Done loading DLLs for Driver at time {0}." + new Date().toString());
+ }
+
+ /**
+ * Load assemblies at global folder
+ */
+ private void loadLibFromGlobal() {
+ final String globalFilePath = System.getProperty(USER_DIR) + this.reefFileNames.getReefGlobal();
+ final File[] files = new File(globalFilePath).listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(DLL_EXTENSION);
+ }
+ });
+
+ LOG.log(Level.INFO, "Total dll files to load from {0} is {1}.", new Object[] {globalFilePath, files.length} );
+ for (int i = 0; i < files.length; i++) {
+ try {
+ LOG.log(Level.INFO, "file to load : " + files[i].toString());
+ NativeInterop.loadClrAssembly(files[i].toString());
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "exception in loading dll library: ", files[i].toString());
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Get file from jar file and copy it to temp dir and loads the library to memory
+ **/
+ private void loadFromReefJar(String name, final boolean managed) throws IOException {
+
+ name = name + DLL_EXTENSION;
+ try {
+ File fileOut = null;
+ // get input file from jar
+ final String path = this.reefFileNames.getReefDriverAppDllDir() + name;
+ LOG.log(Level.INFO, "Source file path: " + path);
+ final java.net.URL url = NativeInterop.class.getClass().getResource(path);
+ if (url != null) {
+ LOG.log(Level.INFO, "Source file: " + url.getPath());
+ }
+ try (final InputStream in = NativeInterop.class.getResourceAsStream(path)) {
+ //copy to /reef/CLRLoadingDirectory
+ final String tempLoadDir = System.getProperty(USER_DIR) + this.reefFileNames.getLoadDir();
+ fileOut = new File(tempLoadDir + LIB_BIN + name);
+ LOG.log(Level.INFO, "Destination file: " + fileOut.toString());
+ if (null == in) {
+ LOG.log(Level.WARNING, "Cannot find " + path);
+ return;
+ }
+ try (final OutputStream out = new FileOutputStream(fileOut) ) {
+ IOUtils.copy(in, out);
+ }
+ }
+ loadAssembly(fileOut, managed);
+ } catch (final FileNotFoundException e) {
+ LOG.log(Level.SEVERE, "File not find exception: ", name);
+ throw e;
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "File copy error: ", name);
+ throw e;
+ }
+ }
+
+ /**
+ * load assembly
+ * @param fileOut
+ * @param managed
+ */
+ private void loadAssembly(final File fileOut, final boolean managed) {
+ if (managed) {
+ NativeInterop.loadClrAssembly(fileOut.toString());
+ LOG.log(Level.INFO, "Loading DLL managed done");
+ } else {
+ System.load(fileOut.toString());
+ LOG.log(Level.INFO, "Loading DLL not managed done");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java
new file mode 100644
index 0000000..4249ba7
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeBridge.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public abstract class NativeBridge implements AutoCloseable {
+
+ private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName());
+
+ public void onError(String errorMessage) {
+ LOG.log(Level.SEVERE, "Bridge received error from CLR: " + errorMessage);
+ throw new RuntimeException("Bridge received error from CLR: " + errorMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
new file mode 100644
index 0000000..9fe61c1
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import java.util.HashMap;
+
+public class NativeInterop {
+ public static final String CLASS_HIERARCHY_FILENAME = "clrClassHierarchy.bin";
+ public static final String GLOBAL_LIBRARIES_FILENAME = "userSuppliedGlobalLibraries.txt";
+ public static final String EvaluatorRequestorKey = "EvaluatorRequestor";
+ public static final String AllocatedEvaluatorKey = "AllocatedEvaluator";
+ public static final String ActiveContextKey = "ActiveContext";
+ public static final String TaskMessageKey = "TaskMessage";
+ public static final String FailedTaskKey = "FailedTask";
+ public static final String FailedEvaluatorKey = "FailedEvaluator";
+ public static final String HttpServerKey = "HttpServerKey";
+ public static final String CompletedTaskKey = "CompletedTask";
+ public static final String RunningTaskKey = "RunningTask";
+ public static final String SuspendedTaskKey = "SuspendedTask";
+ public static final String CompletedEvaluatorKey = "CompletedEvaluator";
+ public static final String ClosedContextKey = "ClosedContext";
+ public static final String FailedContextKey = "FailedContext";
+ public static final String ContextMessageKey = "ContextMessage";
+ public static final String DriverRestartKey = "DriverRestart";
+ public static final String DriverRestartActiveContextKey = "DriverRestartActiveContext";
+ public static final String DriverRestartRunningTaskKey = "DriverRestartRunningTask";
+ public static final HashMap<String, Integer> Handlers = new HashMap<String, Integer>() {
+ {
+ put(EvaluatorRequestorKey, 0);
+ put(AllocatedEvaluatorKey, 1);
+ put(ActiveContextKey, 2);
+ put(TaskMessageKey, 3);
+ put(FailedTaskKey, 4);
+ put(FailedEvaluatorKey, 5);
+ put(HttpServerKey, 6);
+ put(CompletedTaskKey, 7);
+ put(RunningTaskKey, 8);
+ put(SuspendedTaskKey, 9);
+ put(CompletedEvaluatorKey, 10);
+ put(ClosedContextKey, 11);
+ put(FailedContextKey, 12);
+ put(ContextMessageKey, 13);
+ put(DriverRestartKey, 14);
+ put(DriverRestartActiveContextKey, 15);
+ put(DriverRestartRunningTaskKey, 16);
+ }
+ };
+
+ public static final int nHandlers = 17;
+
+ public static native void loadClrAssembly(String filePath);
+
+ public static native void ClrBufferedLog(int level, String message);
+
+ public static native long[] CallClrSystemOnStartHandler(String dateTime, String httpServerPortNumber);
+
+ public static native void ClrSystemAllocatedEvaluatorHandlerOnNext(
+ long handle,
+ AllocatedEvaluatorBridge javaEvaluatorBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemActiveContextHandlerOnNext(
+ long handle,
+ ActiveContextBridge javaActiveContextBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemEvaluatorRequstorHandlerOnNext(
+ long handle,
+ EvaluatorRequestorBridge javaEvluatorRequstorBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemTaskMessageHandlerOnNext(
+ long handle,
+ byte[] mesage,
+ TaskMessageBridge javaTaskMessageBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemFailedTaskHandlerOnNext(
+ long handle,
+ FailedTaskBridge failedTaskBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemHttpServerHandlerOnNext(
+ long handle,
+ HttpServerEventBridge httpServerEventBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemFailedEvaluatorHandlerOnNext(
+ long handle,
+ FailedEvaluatorBridge failedEvaluatorBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemCompletedTaskHandlerOnNext(
+ long handle,
+ CompletedTaskBridge completedTaskBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemRunningTaskHandlerOnNext(
+ long handle,
+ RunningTaskBridge runningTaskBridge,
+ InteropLogger interopLogger
+ );
+
+ public static native void ClrSystemSupendedTaskHandlerOnNext(
+ long handle,
+ SuspendedTaskBridge suspendedTaskBridge
+ );
+
+ public static native void ClrSystemCompletdEvaluatorHandlerOnNext(
+ long handle,
+ CompletedEvaluatorBridge completedEvaluatorBridge
+ );
+
+ public static native void ClrSystemClosedContextHandlerOnNext(
+ long handle,
+ ClosedContextBridge closedContextBridge
+ );
+
+ public static native void ClrSystemFailedContextHandlerOnNext(
+ long handle,
+ FailedContextBridge failedContextBridge
+ );
+
+ public static native void ClrSystemContextMessageHandlerOnNext(
+ long handle,
+ ContextMessageBridge contextMessageBridge
+ );
+
+ public static native void ClrSystemDriverRestartHandlerOnNext(
+ long handle
+ );
+
+ public static native void ClrSystemDriverRestartActiveContextHandlerOnNext(
+ long handle,
+ ActiveContextBridge activeContextBridge
+ );
+
+ public static native void ClrSystemDriverRestartRunningTaskHandlerOnNext(
+ long handle,
+ RunningTaskBridge runningTaskBridge
+ );
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
new file mode 100644
index 0000000..301c4fc
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.RunningTask;
+
+import java.util.logging.Logger;
+
+public class RunningTaskBridge extends NativeBridge {
+ private static final Logger LOG = Logger.getLogger(RunningTaskBridge.class.getName());
+
+ final private RunningTask jrunningTask;
+ final private ActiveContextBridge jactiveContext;
+
+ public RunningTaskBridge(RunningTask runningTask) {
+ jrunningTask = runningTask;
+ final ActiveContext activeContext = runningTask.getActiveContext();
+ jactiveContext = new ActiveContextBridge(activeContext);
+ }
+
+ public final String getId() {
+ return jrunningTask.getId();
+ }
+
+ public final void send(final byte[] message) {
+ jrunningTask.send(message);
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
new file mode 100644
index 0000000..16fa3d3
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.task.SuspendedTask;
+import org.apache.reef.io.Message;
+import org.apache.reef.io.naming.Identifiable;
+
+public class SuspendedTaskBridge extends NativeBridge implements Identifiable, Message {
+
+ private final SuspendedTask jsuspendedTask;
+ private final String taskId;
+ private final ActiveContextBridge jactiveContext;
+
+ public SuspendedTaskBridge(SuspendedTask suspendedTask) {
+ jsuspendedTask = suspendedTask;
+ taskId = suspendedTask.getId();
+ jactiveContext = new ActiveContextBridge(jsuspendedTask.getActiveContext());
+ }
+
+ public ActiveContextBridge getActiveContext() {
+ return jactiveContext;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public String getId() {
+ return taskId;
+ }
+
+ @Override
+ public byte[] get() {
+ return jsuspendedTask.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
new file mode 100644
index 0000000..25b0478
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/TaskMessageBridge.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.task.TaskMessage;
+
+public class TaskMessageBridge extends NativeBridge {
+ private TaskMessage jtaskMessage;
+ private String taskId;
+
+ // we don't really need to pass this around, just have this as place holder for future.
+ public TaskMessageBridge(TaskMessage taskMessage) {
+ jtaskMessage = taskMessage;
+ taskId = taskMessage.getId();
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java
new file mode 100644
index 0000000..e6d0849
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/Utilities.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.tang.ClassHierarchy;
+import org.apache.reef.tang.implementation.protobuf.ProtocolBufferClassHierarchy;
+import org.apache.reef.tang.proto.ClassHierarchyProto;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class Utilities {
+ public static ClassHierarchy loadClassHierarchy(String classHierarchyFile) {
+ Path p = Paths.get(classHierarchyFile);
+ if (!Files.exists(p)) {
+ p = Paths.get(System.getProperty("user.dir") + "/reef/global/" + classHierarchyFile);
+ }
+ if (!Files.exists(p)) {
+ throw new RuntimeException("cannot find file " + p.toAbsolutePath());
+ }
+ try (final InputStream chin = new FileInputStream(p.toAbsolutePath().toString())) {
+ final ClassHierarchyProto.Node root = ClassHierarchyProto.Node.parseFrom(chin);
+ final ClassHierarchy ch = new ProtocolBufferClassHierarchy(root);
+ return ch;
+ } catch (final IOException e) {
+ final String message = "Unable to load class hierarchy from " + classHierarchyFile;
+ throw new RuntimeException(message, e);
+ }
+ }
+
+ public static String getEvaluatorDescriptorString(EvaluatorDescriptor evaluatorDescriptor) {
+ InetSocketAddress socketAddress = evaluatorDescriptor.getNodeDescriptor().getInetSocketAddress();
+ return "IP=" + socketAddress.getAddress() + ", Port=" + socketAddress.getPort() + ", HostName=" + socketAddress.getHostName() + ", Memory=" + evaluatorDescriptor.getMemory() + ", Core=" + evaluatorDescriptor.getNumberOfCores();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
new file mode 100644
index 0000000..62bfac1
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.client.*;
+import org.apache.reef.io.network.naming.NameServerConfiguration;
+import org.apache.reef.javabridge.NativeInterop;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.webserver.HttpHandlerConfiguration;
+import org.apache.reef.webserver.HttpServerReefEventHandler;
+import org.apache.reef.webserver.ReefEventStateManager;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Clr Bridge Client.
+ */
+@Unit
+public class JobClient {
+
+ /**
+ * Standard java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(JobClient.class.getName());
+
+ /**
+ * Reference to the REEF framework.
+ * This variable is injected automatically in the constructor.
+ */
+ private final REEF reef;
+
+ /**
+ * Job Driver configuration.
+ */
+ private Configuration driverConfiguration;
+ private ConfigurationModule driverConfigModule;
+
+ /**
+ * A reference to the running job that allows client to send messages back to the job driver
+ */
+ private RunningJob runningJob;
+
+ /**
+ * Set to false when job driver is done.
+ */
+ private boolean isBusy = true;
+
+ private int driverMemory;
+
+ private String driverId;
+
+ private String jobSubmissionDirectory = "reefTmp/job_" + System.currentTimeMillis();
+
+ /**
+ * A factory that provides LoggingScope
+ */
+ private final LoggingScopeFactory loggingScopeFactory;
+ /**
+ * Clr Bridge client.
+ * Parameters are injected automatically by TANG.
+ *
+ * @param reef Reference to the REEF framework.
+ */
+ @Inject
+ JobClient(final REEF reef, final LoggingScopeFactory loggingScopeFactory) throws BindException {
+ this.loggingScopeFactory = loggingScopeFactory;
+ this.reef = reef;
+ this.driverConfigModule = getDriverConfiguration();
+ }
+
+ public static ConfigurationModule getDriverConfiguration() {
+ return EnvironmentUtils.addClasspath(DriverConfiguration.CONF, DriverConfiguration.GLOBAL_LIBRARIES)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, JobDriver.AllocatedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_FAILED, JobDriver.FailedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, JobDriver.ActiveContextHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, JobDriver.DriverRestartActiveContextHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_CLOSED, JobDriver.ClosedContextHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_FAILED, JobDriver.FailedContextHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_MESSAGE, JobDriver.ContextMessageHandler.class)
+ .set(DriverConfiguration.ON_TASK_MESSAGE, JobDriver.TaskMessageHandler.class)
+ .set(DriverConfiguration.ON_TASK_FAILED, JobDriver.FailedTaskHandler.class)
+ .set(DriverConfiguration.ON_TASK_RUNNING, JobDriver.RunningTaskHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, JobDriver.DriverRestartRunningTaskHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_RESTART_COMPLETED, JobDriver.DriverRestartCompletedHandler.class)
+ .set(DriverConfiguration.ON_TASK_COMPLETED, JobDriver.CompletedTaskHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_RESTARTED, JobDriver.RestartHandler.class)
+ .set(DriverConfiguration.ON_TASK_SUSPENDED, JobDriver.SuspendedTaskHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class);
+ }
+
+ private static Configuration getNameServerConfiguration() {
+ return NameServerConfiguration.CONF
+ .set(NameServerConfiguration.NAME_SERVICE_PORT, 0)
+ .build();
+ }
+
+ /**
+ * @return the driver-side configuration to be merged into the DriverConfiguration to enable the HTTP server.
+ */
+ public static Configuration getHTTPConfiguration() {
+ Configuration httpHandlerConfiguration = HttpHandlerConfiguration.CONF
+ .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerReefEventHandler.class)
+ .build();
+
+ Configuration driverConfigurationForHttpServer = DriverServiceConfiguration.CONF
+ .set(DriverServiceConfiguration.ON_EVALUATOR_ALLOCATED, ReefEventStateManager.AllocatedEvaluatorStateHandler.class)
+ .set(DriverServiceConfiguration.ON_CONTEXT_ACTIVE, ReefEventStateManager.ActiveContextStateHandler.class)
+ .set(DriverServiceConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, ReefEventStateManager.DrivrRestartActiveContextStateHandler.class)
+ .set(DriverServiceConfiguration.ON_TASK_RUNNING, ReefEventStateManager.TaskRunningStateHandler.class)
+ .set(DriverServiceConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, ReefEventStateManager.DriverRestartTaskRunningStateHandler.class)
+ .set(DriverServiceConfiguration.ON_DRIVER_STARTED, ReefEventStateManager.StartStateHandler.class)
+ .set(DriverServiceConfiguration.ON_DRIVER_STOP, ReefEventStateManager.StopStateHandler.class)
+ .build();
+ return Configurations.merge(httpHandlerConfiguration, driverConfigurationForHttpServer);
+ }
+
+ public void addCLRFiles(final File folder) throws BindException {
+ try (final LoggingScope ls = this.loggingScopeFactory.getNewLoggingScope("JobClient::addCLRFiles")) {
+ ConfigurationModule result = this.driverConfigModule;
+ for (final File f : folder.listFiles()) {
+ if (f.canRead() && f.exists() && f.isFile()) {
+ result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath());
+ }
+ }
+
+ // set the driver memory, id and job submission directory
+ this.driverConfigModule = result
+ .set(DriverConfiguration.DRIVER_MEMORY, this.driverMemory)
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, this.driverId)
+ .set(DriverConfiguration.DRIVER_JOB_SUBMISSION_DIRECTORY, this.jobSubmissionDirectory);
+
+
+ Path globalLibFile = Paths.get(NativeInterop.GLOBAL_LIBRARIES_FILENAME);
+ if (!Files.exists(globalLibFile)) {
+ LOG.log(Level.FINE, "Cannot find global classpath file at: {0}, assume there is none.", globalLibFile.toAbsolutePath());
+ } else {
+ String globalLibString = "";
+ try {
+ globalLibString = new String(Files.readAllBytes(globalLibFile));
+ } catch (final Exception e) {
+ LOG.log(Level.WARNING, "Cannot read from {0}, global libraries not added " + globalLibFile.toAbsolutePath());
+ }
+
+ for (final String s : globalLibString.split(",")) {
+ File f = new File(s);
+ this.driverConfigModule = this.driverConfigModule.set(DriverConfiguration.GLOBAL_LIBRARIES, f.getPath());
+ }
+ }
+
+ this.driverConfiguration = Configurations.merge(this.driverConfigModule.build(), getHTTPConfiguration(), getNameServerConfiguration());
+ }
+ }
+
+ /**
+ * Launch the job driver.
+ *
+ * @throws org.apache.reef.tang.exceptions.BindException configuration error.
+ */
+ public void submit(final File clrFolder, final boolean submitDriver, final Configuration clientConfig) {
+ try (final LoggingScope ls = this.loggingScopeFactory.driverSubmit(submitDriver)) {
+ try {
+ addCLRFiles(clrFolder);
+ } catch (final BindException e) {
+ LOG.log(Level.FINE, "Failed to bind", e);
+ }
+ if (submitDriver) {
+ this.reef.submit(this.driverConfiguration);
+ } else {
+ File driverConfig = new File(System.getProperty("user.dir") + "/driver.config");
+ try {
+ new AvroConfigurationSerializer().toFile(Configurations.merge(this.driverConfiguration, clientConfig), driverConfig);
+ LOG.log(Level.INFO, "Driver configuration file created at " + driverConfig.getAbsolutePath());
+ } catch (final IOException e) {
+ throw new RuntimeException("Cannot create driver configuration file at " + driverConfig.getAbsolutePath());
+ }
+ }
+ }
+ }
+
+ /**
+ * Set the driver memory
+ */
+ public void setDriverInfo(final String identifier, final int memory, final String jobSubmissionDirectory) {
+ if (identifier == null || identifier.isEmpty()) {
+ throw new RuntimeException("driver id cannot be null or empty");
+ }
+ if (memory <= 0) {
+ throw new RuntimeException("driver memory cannot be negative number: " + memory);
+ }
+ this.driverMemory = memory;
+ this.driverId = identifier;
+ if (jobSubmissionDirectory != null && !jobSubmissionDirectory.equals("empty")) {
+ this.jobSubmissionDirectory = jobSubmissionDirectory;
+ } else {
+ LOG.log(Level.FINE, "No job submission directory provided by CLR user, will use " + this.jobSubmissionDirectory);
+ }
+ }
+
+ /**
+ * Notify the process in waitForCompletion() method that the main process has finished.
+ */
+ private synchronized void stopAndNotify() {
+ this.runningJob = null;
+ this.isBusy = false;
+ this.notify();
+ }
+
+ /**
+ * Wait for the job driver to complete. This method is called from Launcher.main()
+ */
+ public void waitForCompletion(final int waitTime) {
+ LOG.info("Waiting for the Job Driver to complete: " + waitTime);
+ if (waitTime == 0) {
+ close(0);
+ return;
+ } else if (waitTime < 0) {
+ waitTillDone();
+ }
+ long endTime = System.currentTimeMillis() + waitTime * 1000;
+ close(endTime);
+ }
+
+ public void close(final long endTime) {
+ while (endTime > System.currentTimeMillis()) {
+ try {
+ Thread.sleep(1000);
+ } catch (final InterruptedException e) {
+ LOG.log(Level.SEVERE, "Thread sleep failed");
+ }
+ }
+ LOG.log(Level.INFO, "Done waiting.");
+ this.stopAndNotify();
+ reef.close();
+ }
+
+ private void waitTillDone() {
+ while (this.isBusy) {
+ try {
+ synchronized (this) {
+ this.wait();
+ }
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
+ }
+ }
+ this.reef.close();
+ }
+
+ /**
+ * Receive notification from the job driver that the job had failed.
+ */
+ final class FailedJobHandler implements EventHandler<FailedJob> {
+ @Override
+ public void onNext(final FailedJob job) {
+ LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getMessage());
+ stopAndNotify();
+ }
+ }
+
+ /**
+ * Receive notification from the job driver that the job had completed successfully.
+ */
+ final class CompletedJobHandler implements EventHandler<CompletedJob> {
+ @Override
+ public void onNext(final CompletedJob job) {
+ LOG.log(Level.INFO, "Completed job: {0}", job.getId());
+ stopAndNotify();
+ }
+ }
+
+ /**
+ * Receive notification that there was an exception thrown from the job driver.
+ */
+ final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
+ @Override
+ public void onNext(final FailedRuntime error) {
+ LOG.log(Level.SEVERE, "Error in job driver: " + error, error.getMessage());
+ stopAndNotify();
+ }
+ }
+
+ final class WakeErrorHandler implements EventHandler<Throwable> {
+ @Override
+ public void onNext(Throwable error) {
+ LOG.log(Level.SEVERE, "Error communicating with job driver, exiting... ", error);
+ stopAndNotify();
+ }
+ }
+}