You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:52 UTC
[28/63] [abbrv] git commit: Adapt RPC to support primitive types as
parameters and return values.
Adapt RPC to support primitive types as parameters and return values.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/aa7550aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/aa7550aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/aa7550aa
Branch: refs/heads/master
Commit: aa7550aa2d05940eed695cfa7a1790197255e47d
Parents: 2d6199f
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jul 21 19:09:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200
----------------------------------------------------------------------
.../flink/types/JavaToValueConverter.java | 98 +++
.../java/org/apache/flink/util/ClassUtils.java | 32 +
.../flink/types/JavaToValueConverterTest.java | 96 +++
.../flink/runtime/instance/AllocatedSlot.java | 15 +-
.../org/apache/flink/runtime/ipc/Client.java | 14 +-
.../java/org/apache/flink/runtime/ipc/RPC.java | 166 ++--
.../org/apache/flink/runtime/ipc/Server.java | 17 +-
.../jobmanager/scheduler/DefaultScheduler.java | 800 +++----------------
.../jobmanager/scheduler/ScheduledUnit.java | 4 +
.../instance/LocalInstanceManagerTest.java | 10 +
.../org/apache/flink/runtime/ipc/RpcTest.java | 130 +++
11 files changed, 580 insertions(+), 802 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-core/src/main/java/org/apache/flink/types/JavaToValueConverter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/JavaToValueConverter.java b/flink-core/src/main/java/org/apache/flink/types/JavaToValueConverter.java
new file mode 100644
index 0000000..1e07e3e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/JavaToValueConverter.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+public class JavaToValueConverter {
+
+ public static Value convertBoxedJavaType(Object boxed) {
+ if (boxed == null) {
+ return null;
+ }
+
+ final Class<?> clazz = boxed.getClass();
+
+ if (clazz == String.class) {
+ return new StringValue((String) boxed);
+ }
+ else if (clazz == Integer.class) {
+ return new IntValue((Integer) boxed);
+ }
+ else if (clazz == Long.class) {
+ return new LongValue((Long) boxed);
+ }
+ else if (clazz == Double.class) {
+ return new DoubleValue((Double) boxed);
+ }
+ else if (clazz == Float.class) {
+ return new FloatValue((Float) boxed);
+ }
+ else if (clazz == Boolean.class) {
+ return new BooleanValue((Boolean) boxed);
+ }
+ else if (clazz == Byte.class) {
+ return new ByteValue((Byte) boxed);
+ }
+ else if (clazz == Short.class) {
+ return new ShortValue((Short) boxed);
+ }
+ else if (clazz == Character.class) {
+ return new CharValue((Character) boxed);
+ }
+ else {
+ throw new IllegalArgumentException("Object is no primitive Java type.");
+ }
+ }
+
+ public static Object convertValueType(Value value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof StringValue) {
+ return ((StringValue) value).getValue();
+ }
+ else if (value instanceof IntValue) {
+ return ((IntValue) value).getValue();
+ }
+ else if (value instanceof LongValue) {
+ return ((LongValue) value).getValue();
+ }
+ else if (value instanceof DoubleValue) {
+ return ((DoubleValue) value).getValue();
+ }
+ else if (value instanceof FloatValue) {
+ return ((FloatValue) value).getValue();
+ }
+ else if (value instanceof BooleanValue) {
+ return ((BooleanValue) value).getValue();
+ }
+ else if (value instanceof ByteValue) {
+ return ((ByteValue) value).getValue();
+ }
+ else if (value instanceof ShortValue) {
+ return ((ShortValue) value).getValue();
+ }
+ else if (value instanceof CharValue) {
+ return ((CharValue) value).getValue();
+ }
+ else {
+ throw new IllegalArgumentException("Object is no primitive Java type.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
index 03a3a23..00ad8e5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
@@ -19,6 +19,9 @@
package org.apache.flink.util;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.protocols.VersionedProtocol;
@@ -93,4 +96,33 @@ public final class ClassUtils {
private static ClassLoader getClassLoader() {
return ClassUtils.class.getClassLoader();
}
+
+ public static Class<?> resolveClassPrimitiveAware(String className) throws ClassNotFoundException {
+ if (className == null) {
+ throw new NullPointerException();
+ }
+
+ Class<?> primClass = PRIMITIVE_TYPES.get(className);
+ if (primClass != null) {
+ return primClass;
+ } else {
+ return Class.forName(className);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final Map<String, Class<?>> PRIMITIVE_TYPES = new HashMap<String, Class<?>>(9);
+
+ static {
+ PRIMITIVE_TYPES.put("byte", byte.class);
+ PRIMITIVE_TYPES.put("short", short.class);
+ PRIMITIVE_TYPES.put("int", int.class);
+ PRIMITIVE_TYPES.put("long", long.class);
+ PRIMITIVE_TYPES.put("float", float.class);
+ PRIMITIVE_TYPES.put("double", double.class);
+ PRIMITIVE_TYPES.put("boolean", boolean.class);
+ PRIMITIVE_TYPES.put("char", char.class);
+ PRIMITIVE_TYPES.put("void", void.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-core/src/test/java/org/apache/flink/types/JavaToValueConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/JavaToValueConverterTest.java b/flink-core/src/test/java/org/apache/flink/types/JavaToValueConverterTest.java
new file mode 100644
index 0000000..cef639e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/JavaToValueConverterTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Test;
+
+public class JavaToValueConverterTest {
+
+ @Test
+ public void testJavaToValueConversion() {
+ try {
+ assertNull(JavaToValueConverter.convertBoxedJavaType(null));
+
+ assertEquals(new StringValue("123Test"), JavaToValueConverter.convertBoxedJavaType("123Test"));
+ assertEquals(new ByteValue((byte) 44), JavaToValueConverter.convertBoxedJavaType((byte) 44));
+ assertEquals(new ShortValue((short) 10000), JavaToValueConverter.convertBoxedJavaType((short) 10000));
+ assertEquals(new IntValue(3567564), JavaToValueConverter.convertBoxedJavaType(3567564));
+ assertEquals(new LongValue(767692734), JavaToValueConverter.convertBoxedJavaType(767692734L));
+ assertEquals(new FloatValue(17.5f), JavaToValueConverter.convertBoxedJavaType(17.5f));
+ assertEquals(new DoubleValue(3.1415926), JavaToValueConverter.convertBoxedJavaType(3.1415926));
+ assertEquals(new BooleanValue(true), JavaToValueConverter.convertBoxedJavaType(true));
+ assertEquals(new CharValue('@'), JavaToValueConverter.convertBoxedJavaType('@'));
+
+ try {
+ JavaToValueConverter.convertBoxedJavaType(new ArrayList<Object>());
+ fail("Accepted invalid type.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testValueToJavaConversion() {
+ try {
+ assertNull(JavaToValueConverter.convertValueType(null));
+
+ assertEquals("123Test", JavaToValueConverter.convertValueType(new StringValue("123Test")));
+ assertEquals((byte) 44, JavaToValueConverter.convertValueType(new ByteValue((byte) 44)));
+ assertEquals((short) 10000, JavaToValueConverter.convertValueType(new ShortValue((short) 10000)));
+ assertEquals(3567564, JavaToValueConverter.convertValueType(new IntValue(3567564)));
+ assertEquals(767692734L, JavaToValueConverter.convertValueType(new LongValue(767692734)));
+ assertEquals(17.5f, JavaToValueConverter.convertValueType(new FloatValue(17.5f)));
+ assertEquals(3.1415926, JavaToValueConverter.convertValueType(new DoubleValue(3.1415926)));
+ assertEquals(true, JavaToValueConverter.convertValueType(new BooleanValue(true)));
+ assertEquals('@', JavaToValueConverter.convertValueType(new CharValue('@')));
+
+ try {
+ JavaToValueConverter.convertValueType(new MyValue());
+ fail("Accepted invalid type.");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static final class MyValue implements Value {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void write(DataOutputView out) {}
+
+ @Override
+ public void read(DataInputView in) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index 71af9db..6289d45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.instance;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
@@ -38,6 +40,9 @@ public class AllocatedSlot {
/** The number of the slot on which the task is deployed */
private final int slotNumber;
+
+ /** Flag that marks the schedule as active */
+ private final AtomicBoolean active = new AtomicBoolean(true);
public AllocatedSlot(JobID jobID, ResourceId resourceId, Instance instance, int slotNumber) {
@@ -72,8 +77,14 @@ public class AllocatedSlot {
// --------------------------------------------------------------------------------------------
- public void runTask(ExecutionVertex2 vertex) {
-
+ /**
+ * @param vertex
+ *
+ * @return True, if the task was scheduled correctly, false if the slot was asynchronously deallocated
+ * in the meantime.
+ */
+ public boolean runTask(ExecutionVertex2 vertex) {
+ return true;
}
public void cancelResource() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
index 87267fe..3c98e78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
@@ -56,13 +56,7 @@ import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-/**
- * A client for an IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
- * a port and is defined by a parameter class and a value class.
- *
- * @see Server
- */
+
public class Client {
public static final Logger LOG = LoggerFactory.getLogger(Client.class);
@@ -633,9 +627,6 @@ public class Client {
}
}
- /**
- * Construct an IPC client whose values are of the given {@link Writable} class.
- */
public Client(final SocketFactory factory) {
this.maxIdleTime = 1000;
this.maxRetries = 10;
@@ -646,9 +637,6 @@ public class Client {
/**
* Construct an IPC client with the default SocketFactory
- *
- * @param valueClass
- * @param conf
*/
public Client() {
this(NetUtils.getDefaultSocketFactory());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
index 1768687..d127d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
@@ -46,47 +46,33 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.types.JavaToValueConverter;
+import org.apache.flink.types.Value;
import org.apache.flink.util.ClassUtils;
-/**
- * A simple RPC mechanism.
- * A <i>protocol</i> is a Java interface. All parameters and return types must
- * be one of:
- * <ul>
- * <li>a primitive type, <code>boolean</code>, <code>byte</code>, <code>char</code>, <code>short</code>,
- * <code>int</code>, <code>long</code>, <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
- * <li>a {@link String}; or</li>
- * <li>a {@link Writable}; or</li>
- * <li>an array of the above types</li>
- * </ul>
- * All methods in the protocol should throw only IOException. No field data of
- * the protocol instance is transmitted.
- */
+
public class RPC {
private static final Logger LOG = LoggerFactory.getLogger(RPC.class);
- private RPC() {
- } // no public ctor
+ private RPC() {}
/** A method invocation, including the method name and its parameters. */
private static class Invocation implements IOReadableWritable {
private String methodName;
- private Class<? extends IOReadableWritable>[] parameterClasses;
+ private Class<?>[] parameterClasses;
- private IOReadableWritable[] parameters;
+ private Object[] parameters;
@SuppressWarnings("unused")
public Invocation() {
}
- // TODO: See if type safety can be improved here
- @SuppressWarnings("unchecked")
- public Invocation(Method method, IOReadableWritable[] parameters) {
+ public Invocation(Method method, Object[] parameters) {
this.methodName = method.getName();
- this.parameterClasses = (Class<? extends IOReadableWritable>[]) method.getParameterTypes();
+ this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
}
@@ -96,21 +82,21 @@ public class RPC {
}
/** The parameter classes. */
- public Class<? extends IOReadableWritable>[] getParameterClasses() {
+ public Class<?>[] getParameterClasses() {
return parameterClasses;
}
/** The parameter instances. */
- public IOReadableWritable[] getParameters() {
+ public Object[] getParameters() {
return parameters;
}
- // TODO: See if type safety can be improved here
+
@SuppressWarnings("unchecked")
public void read(DataInputView in) throws IOException {
this.methodName = StringRecord.readString(in);
- this.parameters = new IOReadableWritable[in.readInt()];
+ this.parameters = new Object[in.readInt()];
this.parameterClasses = new Class[parameters.length];
for (int i = 0; i < parameters.length; i++) {
@@ -118,27 +104,28 @@ public class RPC {
// Read class name for parameter and try to get class to that name
final String className = StringRecord.readString(in);
try {
- parameterClasses[i] = ClassUtils.getRecordByName(className);
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe.toString());
+ parameterClasses[i] = ClassUtils.resolveClassPrimitiveAware(className);
+ }
+ catch (ClassNotFoundException e) {
+ throw new IOException(e);
}
// See if parameter is null
if (in.readBoolean()) {
+ IOReadableWritable value;
try {
final String parameterClassName = StringRecord.readString(in);
- final Class<? extends IOReadableWritable> parameterClass = ClassUtils
- .getRecordByName(parameterClassName);
- parameters[i] = parameterClass.newInstance();
- } catch (IllegalAccessException iae) {
- throw new IOException(iae.toString());
- } catch (InstantiationException ie) {
- throw new IOException(ie.toString());
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe.toString());
+ final Class<? extends IOReadableWritable> parameterClass =
+ (Class<? extends IOReadableWritable>) ClassUtils.resolveClassPrimitiveAware(parameterClassName);
+
+ value = parameterClass.newInstance();
+ parameters[i] = value;
+ }
+ catch (Exception e) {
+ throw new IOException(e);
}
// Object will do everything else on its own
- parameters[i].read(in);
+ value.read(in);
} else {
parameters[i] = null;
}
@@ -148,6 +135,8 @@ public class RPC {
public void write(DataOutputView out) throws IOException {
StringRecord.writeString(out, methodName);
out.writeInt(parameterClasses.length);
+
+ // at this point, type conversion should have happened
for (int i = 0; i < parameterClasses.length; i++) {
StringRecord.writeString(out, parameterClasses[i].getName());
if (parameters[i] == null) {
@@ -155,7 +144,39 @@ public class RPC {
} else {
out.writeBoolean(true);
StringRecord.writeString(out, parameters[i].getClass().getName());
- parameters[i].write(out);
+ ((IOReadableWritable) parameters[i]).write(out);
+ }
+ }
+ }
+
+ public void doTypeConversion() throws IOException {
+ try {
+ for (int i = 0; i < parameterClasses.length; i++) {
+ if (!IOReadableWritable.class.isAssignableFrom(parameterClasses[i])) {
+ try {
+ parameters[i] = JavaToValueConverter.convertBoxedJavaType(parameters[i]);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IOException("Argument " + i + " of method " + methodName
+ + " is not a primitive type (or boxed primitive) and not of type IOReadableWriteable");
+ }
+ }
+ }
+ }
+ catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ throw e;
+ }
+ catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new IOException(e);
+ }
+ }
+
+ public void undoTypeConversion() {
+ for (int i = 0; i < parameterClasses.length; i++) {
+ if (!IOReadableWritable.class.isAssignableFrom(parameterClasses[i])) {
+ parameters[i] = JavaToValueConverter.convertValueType((Value) parameters[i]);
}
}
}
@@ -180,14 +201,6 @@ public class RPC {
static private class ClientCache {
private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();
- /**
- * Construct & cache an IPC client with the user-provided SocketFactory
- * if no cached client exists.
- *
- * @param conf
- * Configuration
- * @return an IPC client
- */
private synchronized Client getClient(SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
@@ -236,26 +249,17 @@ public class RPC {
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-
- // TODO clean up
- IOReadableWritable[] castArgs = null;
- if (args != null) {
- castArgs = new IOReadableWritable[args.length];
-
- // Check if args are instances of ReadableWritable
- for (int i = 0; i < args.length; i++) {
- if ((args[i] != null) && !(args[i] instanceof IOReadableWritable)) {
- throw new IOException("Argument " + i + " of method " + method.getName()
- + " is not of type IOReadableWriteable");
- } else {
- castArgs[i] = (IOReadableWritable) args[i];
- }
- }
+ Invocation invocation = new Invocation(method, args);
+ invocation.doTypeConversion();
+
+ Object retValue = this.client.call(invocation, this.address, method.getDeclaringClass());
+
+ if (IOReadableWritable.class.isAssignableFrom(method.getReturnType())) {
+ return retValue;
+ }
+ else {
+ return JavaToValueConverter.convertValueType((Value) retValue);
}
- final IOReadableWritable value = this.client.call(new Invocation(method, castArgs), this.address, method
- .getDeclaringClass());
-
- return value;
}
/* close the IPC client that's responsible for this invoker's RPCs */
@@ -328,11 +332,6 @@ public class RPC {
/**
* Construct a client-side proxy object with the default SocketFactory
- *
- * @param protocol
- * @param addr
- * @return
- * @throws IOException
*/
public static <V extends VersionedProtocol> V getProxy(Class<V> protocol, InetSocketAddress addr)
throws IOException {
@@ -370,8 +369,6 @@ public class RPC {
*
* @param instance
* the instance whose methods will be called
- * @param conf
- * the configuration to use
* @param bindAddress
* the address to bind on to listen for connection
* @param port
@@ -395,8 +392,6 @@ public class RPC {
*
* @param instance
* the instance whose methods will be called
- * @param conf
- * the configuration to use
* @param bindAddress
* the address to bind on to listen for connection
* @param port
@@ -415,12 +410,26 @@ public class RPC {
try {
final Invocation call = (Invocation) param;
+ call.undoTypeConversion();
final Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses());
method.setAccessible(true);
final Object value = method.invoke((Object) instance, (Object[]) call.getParameters());
- return (IOReadableWritable) value;
+
+ if (IOReadableWritable.class.isAssignableFrom(method.getReturnType())) {
+ return (IOReadableWritable) value;
+ }
+ else {
+ try {
+ return JavaToValueConverter.convertBoxedJavaType(value);
+ }
+ catch (IllegalArgumentException e) {
+ throw new IOException("The return type of method " + method.getName()
+ + " is not a primitive type (or boxed primitive) and not of type IOReadableWriteable");
+ }
+ }
+
} catch (InvocationTargetException e) {
final Throwable target = e.getTargetException();
@@ -431,7 +440,8 @@ public class RPC {
ioe.setStackTrace(target.getStackTrace());
throw ioe;
}
- } catch (Throwable e) {
+ }
+ catch (Throwable e) {
final IOException ioe = new IOException(e.toString());
ioe.setStackTrace(e.getStackTrace());
throw ioe;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
index bde6847..3dcb5b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
@@ -67,13 +67,7 @@ import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.util.ClassUtils;
-/**
- * An abstract IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
- * a port and is defined by a parameter class and a value class.
- *
- * @see Client
- */
+
public abstract class Server {
public static final Logger LOG = LoggerFactory.getLogger(Server.class);
@@ -101,12 +95,7 @@ public abstract class Server {
return protocol;
}
- /**
- * Returns the server instance called under or null. May be called under {@link #call(Writable, long)}
- * implementations, and under {@link Writable} methods of paramters and return values. Permits applications to
- * access
- * the server context.
- */
+
public static Server get() {
return SERVER.get();
}
@@ -119,7 +108,7 @@ public abstract class Server {
/**
* Returns the remote side ip address when invoked inside an RPC
- * Returns null incase of an error.
+ * Returns null in case of an error.
*/
public static InetAddress getRemoteIp() {
Call call = CurCall.get();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
index d23d35f..e038d7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
@@ -19,9 +19,8 @@
package org.apache.flink.runtime.jobmanager.scheduler;
import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.HashMap;
+import java.util.ArrayDeque;
import java.util.HashSet;
-import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +36,9 @@ import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.jobgraph.JobID;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
/**
* The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
* slots.
@@ -66,9 +68,8 @@ public class DefaultScheduler implements InstanceListener {
// */
// private final Cache<ResourceId, Instance> ghostCache;
-
/** All tasks pending to be scheduled */
- private final LinkedBlockingQueue<ScheduledUnit> taskQueue = new LinkedBlockingQueue<ScheduledUnit>();
+ private final Queue<ScheduledUnit> taskQueue = new ArrayDeque<ScheduledUnit>();
/** The thread that runs the scheduling loop, picking up tasks to be scheduled and scheduling them. */
@@ -91,7 +92,6 @@ public class DefaultScheduler implements InstanceListener {
public DefaultScheduler(boolean rejectIfNoResourceAvailable) {
this.rejectIfNoResourceAvailable = rejectIfNoResourceAvailable;
-
// this.ghostCache = CacheBuilder.newBuilder()
// .initialCapacity(64) // easy start
// .maximumSize(1024) // retain some history
@@ -144,639 +144,36 @@ public class DefaultScheduler implements InstanceListener {
this.schedulerThread.setUncaughtExceptionHandler(handler);
}
-// /**
-// * Removes the job represented by the given {@link ExecutionGraph} from the scheduler.
-// *
-// * @param executionGraphToRemove
-// * the job to be removed
-// */
-// void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
-//
-// boolean removedFromQueue = false;
-//
-// synchronized (this.jobQueue) {
-//
-// final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-// while (it.hasNext()) {
-//
-// final ExecutionGraph executionGraph = it.next();
-// if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
-// removedFromQueue = true;
-// it.remove();
-// break;
-// }
-// }
-// }
-//
-// if (!removedFromQueue) {
-// LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
-// + executionGraphToRemove.getJobID() + ") to remove");
-// }
-// }
-//
-// /**
-// * Adds a job represented by an {@link ExecutionGraph} object to the scheduler. The job is then executed according
-// * to the strategies of the concrete scheduler implementation.
-// *
-// * @param executionGraph
-// * the job to be added to the scheduler
-// * @throws SchedulingException
-// * thrown if an error occurs and the scheduler does not accept the new job
-// */
-// public void scheduleJob(final ExecutionGraph executionGraph) throws SchedulingException {
-//
-// final int requiredSlots = executionGraph.getRequiredSlots();
-// final int availableSlots = this.getInstanceManager().getNumberOfSlots();
-//
-// if(requiredSlots > availableSlots){
-// throw new SchedulingException(String.format(
-// "Not enough available task slots to run job %s (%s). Required: %d Available: %d . "
-// + "Either reduce the parallelism of your program, wait for other programs to finish, or increase "
-// + "the number of task slots in the cluster by adding more machines or increasing the number of slots "
-// + "per machine in conf/flink-conf.yaml .",
-// executionGraph.getJobName(), executionGraph.getJobID(), requiredSlots, availableSlots));
-// }
-//
-// // Subscribe to job status notifications
-// executionGraph.registerJobStatusListener(this);
-//
-// // Register execution listener for each vertex
-// final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
-// while (it2.hasNext()) {
-//
-// final ExecutionVertex vertex = it2.next();
-// vertex.registerExecutionListener(new DefaultExecutionListener(this, vertex));
-// }
-//
-// // Register the scheduler as an execution stage listener
-// executionGraph.registerExecutionStageListener(this);
-//
-// // Add job to the job queue (important to add job to queue before requesting instances)
-// synchronized (this.jobQueue) {
-// this.jobQueue.add(executionGraph);
-// }
-//
-// // Request resources for the first stage of the job
-//
-// final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-// try {
-// requestInstances(executionStage);
-// } catch (InstanceException e) {
-// final String exceptionMessage = StringUtils.stringifyException(e);
-// LOG.error(exceptionMessage);
-// this.jobQueue.remove(executionGraph);
-// throw new SchedulingException(exceptionMessage);
-// }
-// }
-//
-// /**
-// * Returns the execution graph which is associated with the given job ID.
-// *
-// * @param jobID
-// * the job ID to search the execution graph for
-// * @return the execution graph which belongs to the given job ID or <code>null</code if no such execution graph
-// * exists
-// */
-// public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
-//
-// synchronized (this.jobQueue) {
-//
-// final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-// while (it.hasNext()) {
-//
-// final ExecutionGraph executionGraph = it.next();
-// if (executionGraph.getJobID().equals(jobID)) {
-// return executionGraph;
-// }
-// }
-// }
-//
-// return null;
-// }
-//
-//
-//
-// public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
-// final String optionalMessage) {
-//
-// if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
-// || newJobStatus == InternalJobStatus.CANCELED) {
-// removeJobFromSchedule(executionGraph);
-// }
-// }
-//
-// public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
-//
-// // Request new instances if necessary
-// try {
-// requestInstances(executionStage);
-// } catch (InstanceException e) {
-// // TODO: Handle error correctly
-// LOG.error(StringUtils.stringifyException(e));
-// }
-//
-// // Deploy the assigned vertices
-// deployAssignedInputVertices(executionStage.getExecutionGraph());
-// }
-//
-//
-// /**
-// * Returns the {@link org.apache.flink.runtime.instance.InstanceManager} object which is used by the current scheduler.
-// *
-// * @return the {@link org.apache.flink.runtime.instance.InstanceManager} object which is used by the current scheduler
-// */
-// public InstanceManager getInstanceManager() {
-// return this.instanceManager;
-// }
-//
-//
-// /**
-// * Collects the instances required to run the job from the given {@link ExecutionStage} and requests them at the
-// * loaded instance manager.
-// *
-// * @param executionStage
-// * the execution stage to collect the required instances from
-// * @throws InstanceException
-// * thrown if the given execution graph is already processing its final stage
-// */
-// protected void requestInstances(final ExecutionStage executionStage) throws InstanceException {
-//
-// final ExecutionGraph executionGraph = executionStage.getExecutionGraph();
-//
-// synchronized (executionStage) {
-//
-// final int requiredSlots = executionStage.getRequiredSlots();
-//
-// LOG.info("Requesting " + requiredSlots + " slots for job " + executionGraph.getJobID());
-//
-// this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
-// requiredSlots);
-//
-// // Switch vertex state to assigning
-// final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
-// .getIndexOfCurrentExecutionStage(), true, true);
-// while (it2.hasNext()) {
-//
-// it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-// }
-// }
-// }
-//
-// void findVerticesToBeDeployed(final ExecutionVertex vertex,
-// final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed,
-// final Set<ExecutionVertex> alreadyVisited) {
-//
-// if (!alreadyVisited.add(vertex)) {
-// return;
-// }
-//
-// if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
-// final Instance instance = vertex.getAllocatedResource().getInstance();
-//
-// if (instance instanceof DummyInstance) {
-// LOG.error("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance");
-// }
-//
-// List<ExecutionVertex> verticesForInstance = verticesToBeDeployed.get(instance);
-// if (verticesForInstance == null) {
-// verticesForInstance = new ArrayList<ExecutionVertex>();
-// verticesToBeDeployed.put(instance, verticesForInstance);
-// }
-//
-// verticesForInstance.add(vertex);
-// }
-//
-// final int numberOfOutputGates = vertex.getNumberOfOutputGates();
-// for (int i = 0; i < numberOfOutputGates; ++i) {
-//
-// final ExecutionGate outputGate = vertex.getOutputGate(i);
-// boolean deployTarget;
-//
-// switch (outputGate.getChannelType()) {
-// case NETWORK:
-// deployTarget = false;
-// break;
-// case IN_MEMORY:
-// deployTarget = true;
-// break;
-// default:
-// throw new IllegalStateException("Unknown channel type");
-// }
-//
-// if (deployTarget) {
-//
-// final int numberOfOutputChannels = outputGate.getNumberOfEdges();
-// for (int j = 0; j < numberOfOutputChannels; ++j) {
-// final ExecutionEdge outputChannel = outputGate.getEdge(j);
-// final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
-// findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
-// }
-// }
-// }
-// }
-//
-// /**
-// * Collects all execution vertices with the state ASSIGNED starting from the given start vertex and
-// * deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-// *
-// * @param startVertex
-// * the execution vertex to start the deployment from
-// */
-// public void deployAssignedVertices(final ExecutionVertex startVertex) {
-//
-// final JobID jobID = startVertex.getExecutionGraph().getJobID();
-//
-// final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-// final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-//
-// findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
-//
-// if (!verticesToBeDeployed.isEmpty()) {
-//
-// final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-// .entrySet()
-// .iterator();
-//
-// while (it2.hasNext()) {
-//
-// final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-// this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-// }
-// }
-// }
-//
-// /**
-// * Collects all execution vertices with the state ASSIGNED from the given pipeline and deploys them on the assigned
-// * {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-// *
-// * @param pipeline
-// * the execution pipeline to be deployed
-// */
-// public void deployAssignedPipeline(final ExecutionPipeline pipeline) {
-//
-// final JobID jobID = null;
-//
-// final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-// final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-//
-// final Iterator<ExecutionVertex> it = pipeline.iterator();
-// while (it.hasNext()) {
-// findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
-// }
-//
-// if (!verticesToBeDeployed.isEmpty()) {
-//
-// final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-// .entrySet()
-// .iterator();
-//
-// while (it2.hasNext()) {
-//
-// final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-// this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-// }
-// }
-// }
-//
-// /**
-// * Collects all execution vertices with the state ASSIGNED starting from the given collection of start vertices and
-// * deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-// *
-// * @param startVertices
-// * the collection of execution vertices to start the deployment from
-// */
-// public void deployAssignedVertices(final Collection<ExecutionVertex> startVertices) {
-//
-// JobID jobID = null;
-//
-// final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-// final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-//
-// for (final ExecutionVertex startVertex : startVertices) {
-//
-// if (jobID == null) {
-// jobID = startVertex.getExecutionGraph().getJobID();
-// }
-//
-// findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
-// }
-//
-// if (!verticesToBeDeployed.isEmpty()) {
-//
-// final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-// .entrySet()
-// .iterator();
-//
-// while (it2.hasNext()) {
-//
-// final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-// this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-// }
-// }
-// }
-//
-// /**
-// * Collects all execution vertices with the state ASSIGNED starting from the input vertices of the current execution
-// * stage and deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-// *
-// * @param executionGraph
-// * the execution graph to collect the vertices from
-// */
-// public void deployAssignedInputVertices(final ExecutionGraph executionGraph) {
-//
-// final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-// final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-//
-// final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-//
-// for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
-//
-// final ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
-// if (!startVertex.isInputVertex()) {
-// continue;
-// }
-//
-// for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
-// final ExecutionVertex vertex = startVertex.getGroupMember(j);
-// findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
-// }
-// }
-//
-// if (!verticesToBeDeployed.isEmpty()) {
-//
-// final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-// .entrySet()
-// .iterator();
-//
-// while (it2.hasNext()) {
-//
-// final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-// this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
-// }
-// }
-// }
-//
-//
-// @Override
-// public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-//
-// if (allocatedResources == null) {
-// LOG.error("Resource to lock is null!");
-// return;
-// }
-//
-// for (final AllocatedResource allocatedResource : allocatedResources) {
-// if (allocatedResource.getInstance() instanceof DummyInstance) {
-// LOG.debug("Available instance is of type DummyInstance!");
-// return;
-// }
-// }
-//
-// final ExecutionGraph eg = getExecutionGraphByID(jobID);
-//
-// if (eg == null) {
-// /*
-// * The job have have been canceled in the meantime, in this case
-// * we release the instance immediately.
-// */
-// try {
-// for (final AllocatedResource allocatedResource : allocatedResources) {
-// getInstanceManager().releaseAllocatedResource(allocatedResource);
-// }
-// } catch (InstanceException e) {
-// LOG.error(e);
-// }
-// return;
-// }
-//
-// final Runnable command = new Runnable() {
-//
-// /**
-// * {@inheritDoc}
-// */
-// @Override
-// public void run() {
-//
-// final ExecutionStage stage = eg.getCurrentExecutionStage();
-//
-// synchronized (stage) {
-//
-// for (final AllocatedResource allocatedResource : allocatedResources) {
-//
-// AllocatedResource resourceToBeReplaced = null;
-// // Important: only look for instances to be replaced in the current stage
-// final Iterator<ExecutionGroupVertex> groupIterator = new ExecutionGroupVertexIterator(eg, true,
-// stage.getStageNumber());
-// while (groupIterator.hasNext()) {
-//
-// final ExecutionGroupVertex groupVertex = groupIterator.next();
-// for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
-//
-// final ExecutionVertex vertex = groupVertex.getGroupMember(i);
-//
-// if (vertex.getExecutionState() == ExecutionState.SCHEDULED
-// && vertex.getAllocatedResource() != null) {
-// resourceToBeReplaced = vertex.getAllocatedResource();
-// break;
-// }
-// }
-//
-// if (resourceToBeReplaced != null) {
-// break;
-// }
-// }
-//
-// // For some reason, we don't need this instance
-// if (resourceToBeReplaced == null) {
-// LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job"
-// + eg.getJobID());
-// try {
-// getInstanceManager().releaseAllocatedResource(allocatedResource);
-// } catch (InstanceException e) {
-// LOG.error(e);
-// }
-// return;
-// }
-//
-// // Replace the selected instance
-// final Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
-// while (it.hasNext()) {
-// final ExecutionVertex vertex = it.next();
-// vertex.setAllocatedResource(allocatedResource);
-// vertex.updateExecutionState(ExecutionState.ASSIGNED);
-// }
-// }
-// }
-//
-// // Deploy the assigned vertices
-// deployAssignedInputVertices(eg);
-//
-// }
-//
-// };
-//
-// eg.executeCommand(command);
-// }
-//
-// /**
-// * Checks if the given {@link AllocatedResource} is still required for the
-// * execution of the given execution graph. If the resource is no longer
-// * assigned to a vertex that is either currently running or about to run
-// * the given resource is returned to the instance manager for deallocation.
-// *
-// * @param executionGraph
-// * the execution graph the provided resource has been used for so far
-// * @param allocatedResource
-// * the allocated resource to check the assignment for
-// */
-// public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph,
-// final AllocatedResource allocatedResource) {
-//
-// if (allocatedResource == null) {
-// LOG.error("Resource to lock is null!");
-// return;
-// }
-//
-// if (allocatedResource.getInstance() instanceof DummyInstance) {
-// LOG.debug("Available instance is of type DummyInstance!");
-// return;
-// }
-//
-// boolean resourceCanBeReleased = true;
-// final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
-// while (it.hasNext()) {
-// final ExecutionVertex vertex = it.next();
-// final ExecutionState state = vertex.getExecutionState();
-//
-// if (state != ExecutionState.CREATED && state != ExecutionState.FINISHED
-// && state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
-//
-// resourceCanBeReleased = false;
-// break;
-// }
-// }
-//
-// if (resourceCanBeReleased) {
-//
-// LOG.info("Releasing instance " + allocatedResource.getInstance());
-// try {
-// getInstanceManager().releaseAllocatedResource(allocatedResource);
-// } catch (InstanceException e) {
-// LOG.error(StringUtils.stringifyException(e));
-// }
-// }
-// }
-//
-// DeploymentManager getDeploymentManager() {
-// return this.deploymentManager;
-// }
-//
-//
-//
-// @Override
-// public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-//
-// final ExecutionGraph eg = getExecutionGraphByID(jobID);
-//
-// if (eg == null) {
-// LOG.error("Cannot find execution graph for job with ID " + jobID);
-// return;
-// }
-//
-// final Runnable command = new Runnable() {
-//
-// /**
-// * {@inheritDoc}
-// */
-// @Override
-// public void run() {
-//
-// synchronized (eg) {
-//
-// for (final AllocatedResource allocatedResource : allocatedResources) {
-//
-// LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID
-// + " died.");
-//
-// final ExecutionGraph executionGraph = getExecutionGraphByID(jobID);
-//
-// if (executionGraph == null) {
-// LOG.error("Cannot find execution graph for job " + jobID);
-// return;
-// }
-//
-// Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
-//
-// // Assign vertices back to a dummy resource.
-// final DummyInstance dummyInstance = DummyInstance.createDummyInstance();
-// final AllocatedResource dummyResource = new AllocatedResource(dummyInstance,
-// new AllocationID());
-//
-// while (vertexIter.hasNext()) {
-// final ExecutionVertex vertex = vertexIter.next();
-// vertex.setAllocatedResource(dummyResource);
-// }
-//
-// final String failureMessage = allocatedResource.getInstance().getName() + " died";
-//
-// vertexIter = allocatedResource.assignedVertices();
-//
-// while (vertexIter.hasNext()) {
-// final ExecutionVertex vertex = vertexIter.next();
-// final ExecutionState state = vertex.getExecutionState();
-//
-// switch (state) {
-// case ASSIGNED:
-// case READY:
-// case STARTING:
-// case RUNNING:
-// case FINISHING:
-//
-// vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
-//
-// break;
-// default:
-// }
-// }
-//
-// // TODO: Fix this
-// /*
-// * try {
-// * requestInstances(this.executionVertex.getGroupVertex().getExecutionStage());
-// * } catch (InstanceException e) {
-// * e.printStackTrace();
-// * // TODO: Cancel the entire job in this case
-// * }
-// */
-// }
-// }
-//
-// final InternalJobStatus js = eg.getJobStatus();
-// if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
-//
-// // TODO: Fix this
-// // deployAssignedVertices(eg);
-//
-// final ExecutionStage stage = eg.getCurrentExecutionStage();
-//
-// try {
-// requestInstances(stage);
-// } catch (InstanceException e) {
-// e.printStackTrace();
-// // TODO: Cancel the entire job in this case
-// }
-// }
-// }
-// };
-//
-// eg.executeCommand(command);
-// }
// --------------------------------------------------------------------------------------------
- // Canceling
+ // Scheduling
// --------------------------------------------------------------------------------------------
- public void removeAllTasksForJob(JobID job) {
+ /**
+ * @param task
+ * @param queueIfNoResource If true, this call will queue the request if no resource is immediately
+ * available. If false, it will throw a {@link NoResourceAvailableException}
+ * if no resource is immediately available.
+ */
+ public void scheduleTask(ScheduledUnit task, boolean queueIfNoResource) {
+ if (task == null) {
+ throw new IllegalArgumentException();
+ }
+
+ // if there is already a slot for that resource
+ AllocatedSlot existing = this.allocatedSlots.get(task.getSharedResourceId());
+ if (existing != null) {
+ // try to attach to the existing slot
+ if (existing.runTask(task.getTaskVertex())) {
+ // all good, we are done
+ return;
+ }
+ // else: the slot was deallocated, we need to proceed as if there was none
+ }
+
+ // check if there is a slot that has an available sub-slot for that group-vertex
+ // TODO
+
}
@@ -836,101 +233,114 @@ public class DefaultScheduler implements InstanceListener {
// Scheduling
// --------------------------------------------------------------------------------------------
- /**
- * Schedules the given unit to an available resource. This call blocks if no resource
- * is currently available
- *
- * @param unit The unit to be scheduled.
- */
- protected void scheduleNextUnit(ScheduledUnit unit) {
- if (unit == null) {
- throw new IllegalArgumentException("Unit to schedule must not be null.");
- }
-
- // see if the resource Id has already an assigned resource
- AllocatedSlot resource = this.allocatedSlots.get(unit.getSharedResourceId());
-
- if (resource == null) {
- // not yet allocated. find a slot to schedule to
- try {
- resource = getResourceToScheduleUnit(unit, this.rejectIfNoResourceAvailable);
- if (resource == null) {
- throw new RuntimeException("Error: The resource to schedule to is null.");
- }
- }
- catch (Exception e) {
- // we cannot go on, the task needs to know what to do now.
- unit.getTaskVertex().handleException(e);
- return;
- }
- }
-
- resource.runTask(unit.getTaskVertex());
- }
+// /**
+// * Schedules the given unit to an available resource. This call blocks if no resource
+// * is currently available
+// *
+// * @param unit The unit to be scheduled.
+// */
+// protected void scheduleQueuedUnit(ScheduledUnit unit) {
+// if (unit == null) {
+// throw new IllegalArgumentException("Unit to schedule must not be null.");
+// }
+//
+// // see if the resource Id has already an assigned resource
+// AllocatedSlot resource = this.allocatedSlots.get(unit.getSharedResourceId());
+//
+// if (resource == null) {
+// // not yet allocated. find a slot to schedule to
+// try {
+// resource = getResourceToScheduleUnit(unit, this.rejectIfNoResourceAvailable);
+// if (resource == null) {
+// throw new RuntimeException("Error: The resource to schedule to is null.");
+// }
+// }
+// catch (Exception e) {
+// // we cannot go on, the task needs to know what to do now.
+// unit.getTaskVertex().handleException(e);
+// return;
+// }
+// }
+//
+// resource.runTask(unit.getTaskVertex());
+// }
/**
* Acquires a resource to schedule the given unit to. This call may block if no
* resource is currently available, or throw an exception, based on the given flag.
*
* @param unit The unit to find a resource for.
- * @param exceptionOnNoAvailability If true, this call should not block is no resource is available,
- * but throw a {@link NoResourceAvailableException}.
* @return The resource to schedule the execution of the given unit on.
*
* @throws NoResourceAvailableException If the {@code exceptionOnNoAvailability} flag is true and the scheduler
* has currently no resources available.
*/
- protected AllocatedSlot getResourceToScheduleUnit(ScheduledUnit unit, boolean exceptionOnNoAvailability)
+ protected AllocatedSlot getNewSlotForTask(ScheduledUnit unit, boolean queueIfNoResource)
throws NoResourceAvailableException
{
- AllocatedSlot slot = null;
-
- while (true) {
- synchronized (this.lock) {
- Instance instanceToUse = this.instancesWithAvailableResources.poll();
-
- // if there is nothing, throw an exception or wait, depending on what is configured
- if (instanceToUse == null) {
- if (exceptionOnNoAvailability) {
- throw new NoResourceAvailableException(unit);
+ synchronized (this.lock) {
+ Instance instanceToUse = this.instancesWithAvailableResources.poll();
+
+ // if there is nothing, throw an exception or wait, depending on what is configured
+ if (instanceToUse != null) {
+ try {
+ AllocatedSlot slot = instanceToUse.allocateSlot(unit.getJobId(), unit.getSharedResourceId());
+
+ // if the instance has further available slots, re-add it to the set of available resources.
+ if (instanceToUse.hasResourcesAvailable()) {
+ this.instancesWithAvailableResources.add(instanceToUse);
}
- else {
- try {
- do {
- this.lock.wait(2000);
- }
- while (!shutdown.get() &&
- (instanceToUse = this.instancesWithAvailableResources.poll()) == null);
- }
- catch (InterruptedException e) {
- throw new NoResourceAvailableException("The scheduler was interrupted.");
+
+ if (slot != null) {
+ AllocatedSlot previous = this.allocatedSlots.putIfAbsent(unit.getSharedResourceId(), slot);
+ if (previous != null) {
+ // concurrently, someone allocated a slot for that ID
+ // release the new one
+ slot.cancelResource();
+ slot = previous;
}
}
- }
-
- // at this point, we have an instance. request a slot from the instance
- try {
- slot = instanceToUse.allocateSlot(unit.getJobId(), unit.getSharedResourceId());
+ // else fall through the loop
}
catch (InstanceDiedException e) {
// the instance died it has not yet been propagated to this scheduler
// remove the instance from the set of available instances
this.allInstances.remove(instanceToUse);
}
+ }
+
+
+ if (queueIfNoResource) {
+ this.taskQueue.add(unit);
+ }
+ else {
+ throw new NoResourceAvailableException(unit);
+ }
+ // at this point, we have an instance. request a slot from the instance
+
// if the instance has further available slots, re-add it to the set of available
// resources.
- // if it does not, but asynchronously
+ // if it does not, but asynchronously a slot became available, we may attempt to add the
+ // instance twice, which does not matter because of the set semantics of the "instancesWithAvailableResources"
if (instanceToUse.hasResourcesAvailable()) {
this.instancesWithAvailableResources.add(instanceToUse);
}
if (slot != null) {
- return slot;
+ AllocatedSlot previous = this.allocatedSlots.putIfAbsent(unit.getSharedResourceId(), slot);
+ if (previous != null) {
+ // concurrently, someone allocated a slot for that ID
+ // release the new one
+ slot.cancelResource();
+ slot = previous;
+ }
}
// else fall through the loop
}
}
+
+ return slot;
}
protected void runSchedulerLoop() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 8caf64a..338529f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -15,6 +15,8 @@
package org.apache.flink.runtime.jobmanager.scheduler;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
import org.apache.flink.runtime.jobgraph.JobID;
@@ -26,6 +28,8 @@ public class ScheduledUnit {
private final ResourceId resourceId;
+ private final AtomicBoolean scheduled = new AtomicBoolean(false);
+
public ScheduledUnit(JobID jobId, ExecutionVertex2 taskVertex) {
this(jobId, taskVertex, new ResourceId());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index 02c814c..b047222 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -251,5 +251,15 @@ public class LocalInstanceManagerTest {
@Override
public void deploy(JobID jobID, Instance instance, List<ExecutionVertex> verticesToBeDeployed) {}
+
+ @Override
+ public boolean sendHeartbeat(InstanceID taskManagerId) {
+ return true;
+ }
+
+ @Override
+ public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
+ return new InstanceID();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/test/java/org/apache/flink/runtime/ipc/RpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/ipc/RpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/ipc/RpcTest.java
new file mode 100644
index 0000000..d45b131
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/ipc/RpcTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.ipc;
+
+import static org.junit.Assert.*;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import org.apache.flink.core.protocols.VersionedProtocol;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.LogUtils;
+
+
+public class RpcTest {
+
+ @BeforeClass
+ public static void initLogger() {
+ LogUtils.initializeDefaultConsoleLogger();
+ }
+
+
+ @Test
+ public void testRpc() {
+ try {
+ Server server = null;
+ TestProtocol proxy = null;
+
+ try {
+ // setup the RPCs
+ int port = getAvailablePort();
+ server = RPC.getServer(new TestProtocolImpl(), "localhost", port, 4);
+ server.start();
+
+ proxy = RPC.getProxy(TestProtocol.class, new InetSocketAddress("localhost", port), NetUtils.getSocketFactory());
+
+ // make a few calls with various types
+// proxy.methodWithNoParameters();
+
+ assertEquals(19, proxy.methodWithPrimitives(16, new StringValue("abc")));
+ assertEquals(new DoubleValue(17.0), proxy.methodWithWritables(new LongValue(17)));
+ }
+ finally {
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+ private static final int getAvailablePort() throws IOException {
+ ServerSocket serverSocket = null;
+ for (int i = 0; i < 50; i++){
+ try {
+ serverSocket = new ServerSocket(0);
+ int port = serverSocket.getLocalPort();
+ if (port != 0) {
+ return port;
+ }
+ }
+ catch (IOException e) {}
+ finally {
+ if (serverSocket != null) {
+ serverSocket.close();
+ }
+ }
+ }
+
+ throw new IOException("Could not find a free port.");
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static interface TestProtocol extends VersionedProtocol {
+
+ public void methodWithNoParameters();
+
+ public int methodWithPrimitives(int intParam, StringValue writableParam);
+
+ public DoubleValue methodWithWritables(LongValue writableParam);
+ }
+
+
+ public static final class TestProtocolImpl implements TestProtocol {
+
+ @Override
+ public void methodWithNoParameters() {}
+
+ @Override
+ public int methodWithPrimitives(int intParam, StringValue writableParam) {
+ return intParam + writableParam.length();
+ }
+
+ @Override
+ public DoubleValue methodWithWritables(LongValue writableParam) {
+ return new DoubleValue(writableParam.getValue());
+ }
+ }
+}