You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:52 UTC

[28/63] [abbrv] git commit: Adapt RPC to support primitive types as parameters and return values.

Adapt RPC to support primitive types as parameters and return values.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/aa7550aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/aa7550aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/aa7550aa

Branch: refs/heads/master
Commit: aa7550aa2d05940eed695cfa7a1790197255e47d
Parents: 2d6199f
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jul 21 19:09:26 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:48 2014 +0200

----------------------------------------------------------------------
 .../flink/types/JavaToValueConverter.java       |  98 +++
 .../java/org/apache/flink/util/ClassUtils.java  |  32 +
 .../flink/types/JavaToValueConverterTest.java   |  96 +++
 .../flink/runtime/instance/AllocatedSlot.java   |  15 +-
 .../org/apache/flink/runtime/ipc/Client.java    |  14 +-
 .../java/org/apache/flink/runtime/ipc/RPC.java  | 166 ++--
 .../org/apache/flink/runtime/ipc/Server.java    |  17 +-
 .../jobmanager/scheduler/DefaultScheduler.java  | 800 +++----------------
 .../jobmanager/scheduler/ScheduledUnit.java     |   4 +
 .../instance/LocalInstanceManagerTest.java      |  10 +
 .../org/apache/flink/runtime/ipc/RpcTest.java   | 130 +++
 11 files changed, 580 insertions(+), 802 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-core/src/main/java/org/apache/flink/types/JavaToValueConverter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/JavaToValueConverter.java b/flink-core/src/main/java/org/apache/flink/types/JavaToValueConverter.java
new file mode 100644
index 0000000..1e07e3e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/JavaToValueConverter.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+public class JavaToValueConverter {
+
+	public static Value convertBoxedJavaType(Object boxed) {
+		if (boxed == null) {
+			return null;
+		}
+		
+		final Class<?> clazz = boxed.getClass();
+		
+		if (clazz == String.class) {
+			return new StringValue((String) boxed);
+		}
+		else if (clazz == Integer.class) {
+			return new IntValue((Integer) boxed);
+		}
+		else if (clazz == Long.class) {
+			return new LongValue((Long) boxed);
+		}
+		else if (clazz == Double.class) {
+			return new DoubleValue((Double) boxed);
+		}
+		else if (clazz == Float.class) {
+			return new FloatValue((Float) boxed);
+		}
+		else if (clazz == Boolean.class) {
+			return new BooleanValue((Boolean) boxed);
+		}
+		else if (clazz == Byte.class) {
+			return new ByteValue((Byte) boxed);
+		}
+		else if (clazz == Short.class) {
+			return new ShortValue((Short) boxed);
+		}
+		else if (clazz == Character.class) {
+			return new CharValue((Character) boxed);
+		}
+		else {
+			throw new IllegalArgumentException("Object is no primitive Java type.");
+		}
+	}
+	
+	public static Object convertValueType(Value value) {
+		if (value == null) {
+			return null;
+		}
+		
+		if (value instanceof StringValue) {
+			return ((StringValue) value).getValue();
+		}
+		else if (value instanceof IntValue) {
+			return ((IntValue) value).getValue();
+		}
+		else if (value instanceof  LongValue) {
+			return ((LongValue) value).getValue();
+		}
+		else if (value instanceof DoubleValue) {
+			return ((DoubleValue) value).getValue();
+		}
+		else if (value instanceof FloatValue) {
+			return ((FloatValue) value).getValue();
+		}
+		else if (value instanceof BooleanValue) {
+			return ((BooleanValue) value).getValue();
+		}
+		else if (value instanceof ByteValue) {
+			return ((ByteValue) value).getValue();
+		}
+		else if (value instanceof ShortValue) {
+			return ((ShortValue) value).getValue();
+		}
+		else if (value instanceof CharValue) {
+			return ((CharValue) value).getValue();
+		}
+		else {
+			throw new IllegalArgumentException("Object is no primitive Java type.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
index 03a3a23..00ad8e5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ClassUtils.java
@@ -19,6 +19,9 @@
 
 package org.apache.flink.util;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.protocols.VersionedProtocol;
@@ -93,4 +96,33 @@ public final class ClassUtils {
 	private static ClassLoader getClassLoader() {
 		return ClassUtils.class.getClassLoader();
 	}
+	
+	public static Class<?> resolveClassPrimitiveAware(String className) throws ClassNotFoundException {
+		if (className == null) {
+			throw new NullPointerException();
+		}
+		
+		Class<?> primClass = PRIMITIVE_TYPES.get(className);
+		if (primClass != null) {
+			return primClass;
+		} else {
+			return Class.forName(className);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final Map<String, Class<?>> PRIMITIVE_TYPES = new HashMap<String, Class<?>>(9);
+	
+	static {
+		PRIMITIVE_TYPES.put("byte", byte.class);
+		PRIMITIVE_TYPES.put("short", short.class);
+		PRIMITIVE_TYPES.put("int", int.class);
+		PRIMITIVE_TYPES.put("long", long.class);
+		PRIMITIVE_TYPES.put("float", float.class);
+		PRIMITIVE_TYPES.put("double", double.class);
+		PRIMITIVE_TYPES.put("boolean", boolean.class);
+		PRIMITIVE_TYPES.put("char", char.class);
+		PRIMITIVE_TYPES.put("void", void.class);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-core/src/test/java/org/apache/flink/types/JavaToValueConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/JavaToValueConverterTest.java b/flink-core/src/test/java/org/apache/flink/types/JavaToValueConverterTest.java
new file mode 100644
index 0000000..cef639e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/JavaToValueConverterTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Test;
+
+public class JavaToValueConverterTest {
+
+	@Test
+	public void testJavaToValueConversion() {
+		try {
+			assertNull(JavaToValueConverter.convertBoxedJavaType(null));
+			
+			assertEquals(new StringValue("123Test"), JavaToValueConverter.convertBoxedJavaType("123Test"));
+			assertEquals(new ByteValue((byte) 44), JavaToValueConverter.convertBoxedJavaType((byte) 44));
+			assertEquals(new ShortValue((short) 10000), JavaToValueConverter.convertBoxedJavaType((short) 10000));
+			assertEquals(new IntValue(3567564), JavaToValueConverter.convertBoxedJavaType(3567564));
+			assertEquals(new LongValue(767692734), JavaToValueConverter.convertBoxedJavaType(767692734L));
+			assertEquals(new FloatValue(17.5f), JavaToValueConverter.convertBoxedJavaType(17.5f));
+			assertEquals(new DoubleValue(3.1415926), JavaToValueConverter.convertBoxedJavaType(3.1415926));
+			assertEquals(new BooleanValue(true), JavaToValueConverter.convertBoxedJavaType(true));
+			assertEquals(new CharValue('@'), JavaToValueConverter.convertBoxedJavaType('@'));
+			
+			try {
+				JavaToValueConverter.convertBoxedJavaType(new ArrayList<Object>());
+				fail("Accepted invalid type.");
+			} catch (IllegalArgumentException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testValueToJavaConversion() {
+		try {
+			assertNull(JavaToValueConverter.convertValueType(null));
+			
+			assertEquals("123Test", JavaToValueConverter.convertValueType(new StringValue("123Test")));
+			assertEquals((byte) 44, JavaToValueConverter.convertValueType(new ByteValue((byte) 44)));
+			assertEquals((short) 10000, JavaToValueConverter.convertValueType(new ShortValue((short) 10000)));
+			assertEquals(3567564, JavaToValueConverter.convertValueType(new IntValue(3567564)));
+			assertEquals(767692734L, JavaToValueConverter.convertValueType(new LongValue(767692734)));
+			assertEquals(17.5f, JavaToValueConverter.convertValueType(new FloatValue(17.5f)));
+			assertEquals(3.1415926, JavaToValueConverter.convertValueType(new DoubleValue(3.1415926)));
+			assertEquals(true, JavaToValueConverter.convertValueType(new BooleanValue(true)));
+			assertEquals('@', JavaToValueConverter.convertValueType(new CharValue('@')));
+			
+			try {
+				JavaToValueConverter.convertValueType(new MyValue());
+				fail("Accepted invalid type.");
+			} catch (IllegalArgumentException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static final class MyValue implements Value {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void write(DataOutputView out) {}
+
+		@Override
+		public void read(DataInputView in) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index 71af9db..6289d45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.instance;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.ResourceId;
@@ -38,6 +40,9 @@ public class AllocatedSlot {
 	
 	/** The number of the slot on which the task is deployed */
 	private final int slotNumber;
+	
+	/** Flag that marks the schedule as active */
+	private final AtomicBoolean active = new AtomicBoolean(true);
 
 
 	public AllocatedSlot(JobID jobID, ResourceId resourceId, Instance instance, int slotNumber) {
@@ -72,8 +77,14 @@ public class AllocatedSlot {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public void runTask(ExecutionVertex2 vertex) {
-		
+	/**
+	 * @param vertex
+	 * 
+	 * @return True, if the task was scheduled correctly, false if the slot was asynchronously deallocated
+	 *         in the meantime.
+	 */
+	public boolean runTask(ExecutionVertex2 vertex) {
+		return true;
 	}
 	
 	public void cancelResource() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
index 87267fe..3c98e78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
@@ -56,13 +56,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-/**
- * A client for an IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
- * a port and is defined by a parameter class and a value class.
- * 
- * @see Server
- */
+
 public class Client {
 
 	public static final Logger LOG = LoggerFactory.getLogger(Client.class);
@@ -633,9 +627,6 @@ public class Client {
 		}
 	}
 
-	/**
-	 * Construct an IPC client whose values are of the given {@link Writable} class.
-	 */
 	public Client(final SocketFactory factory) {
 		this.maxIdleTime = 1000;
 		this.maxRetries = 10;
@@ -646,9 +637,6 @@ public class Client {
 
 	/**
 	 * Construct an IPC client with the default SocketFactory
-	 * 
-	 * @param valueClass
-	 * @param conf
 	 */
 	public Client() {
 		this(NetUtils.getDefaultSocketFactory());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
index 1768687..d127d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
@@ -46,47 +46,33 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.types.JavaToValueConverter;
+import org.apache.flink.types.Value;
 import org.apache.flink.util.ClassUtils;
 
-/**
- * A simple RPC mechanism.
- * A <i>protocol</i> is a Java interface. All parameters and return types must
- * be one of:
- * <ul>
- * <li>a primitive type, <code>boolean</code>, <code>byte</code>, <code>char</code>, <code>short</code>,
- * <code>int</code>, <code>long</code>, <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
- * <li>a {@link String}; or</li>
- * <li>a {@link Writable}; or</li>
- * <li>an array of the above types</li>
- * </ul>
- * All methods in the protocol should throw only IOException. No field data of
- * the protocol instance is transmitted.
- */
+
 public class RPC {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RPC.class);
 
-	private RPC() {
-	} // no public ctor
+	private RPC() {}
 
 	/** A method invocation, including the method name and its parameters. */
 	private static class Invocation implements IOReadableWritable {
 
 		private String methodName;
 
-		private Class<? extends IOReadableWritable>[] parameterClasses;
+		private Class<?>[] parameterClasses;
 
-		private IOReadableWritable[] parameters;
+		private Object[] parameters;
 
 		@SuppressWarnings("unused")
 		public Invocation() {
 		}
 
-		// TODO: See if type safety can be improved here
-		@SuppressWarnings("unchecked")
-		public Invocation(Method method, IOReadableWritable[] parameters) {
+		public Invocation(Method method, Object[] parameters) {
 			this.methodName = method.getName();
-			this.parameterClasses = (Class<? extends IOReadableWritable>[]) method.getParameterTypes();
+			this.parameterClasses = method.getParameterTypes();
 			this.parameters = parameters;
 		}
 
@@ -96,21 +82,21 @@ public class RPC {
 		}
 
 		/** The parameter classes. */
-		public Class<? extends IOReadableWritable>[] getParameterClasses() {
+		public Class<?>[] getParameterClasses() {
 			return parameterClasses;
 		}
 
 		/** The parameter instances. */
-		public IOReadableWritable[] getParameters() {
+		public Object[] getParameters() {
 			return parameters;
 		}
 
-		// TODO: See if type safety can be improved here
+
 		@SuppressWarnings("unchecked")
 		public void read(DataInputView in) throws IOException {
 
 			this.methodName = StringRecord.readString(in);
-			this.parameters = new IOReadableWritable[in.readInt()];
+			this.parameters = new Object[in.readInt()];
 			this.parameterClasses = new Class[parameters.length];
 
 			for (int i = 0; i < parameters.length; i++) {
@@ -118,27 +104,28 @@ public class RPC {
 				// Read class name for parameter and try to get class to that name
 				final String className = StringRecord.readString(in);
 				try {
-					parameterClasses[i] = ClassUtils.getRecordByName(className);
-				} catch (ClassNotFoundException cnfe) {
-					throw new IOException(cnfe.toString());
+					parameterClasses[i] = ClassUtils.resolveClassPrimitiveAware(className);
+				} 
+				catch (ClassNotFoundException e) {
+					throw new IOException(e);
 				}
 
 				// See if parameter is null
 				if (in.readBoolean()) {
+					IOReadableWritable value;
 					try {
 						final String parameterClassName = StringRecord.readString(in);
-						final Class<? extends IOReadableWritable> parameterClass = ClassUtils
-							.getRecordByName(parameterClassName);
-						parameters[i] = parameterClass.newInstance();
-					} catch (IllegalAccessException iae) {
-						throw new IOException(iae.toString());
-					} catch (InstantiationException ie) {
-						throw new IOException(ie.toString());
-					} catch (ClassNotFoundException cnfe) {
-						throw new IOException(cnfe.toString());
+						final Class<? extends IOReadableWritable> parameterClass =
+								(Class<? extends IOReadableWritable>) ClassUtils.resolveClassPrimitiveAware(parameterClassName);
+						
+						value = parameterClass.newInstance();
+						parameters[i] = value;
+					}
+					catch (Exception e) {
+						throw new IOException(e);
 					}
 					// Object will do everything else on its own
-					parameters[i].read(in);
+					value.read(in);
 				} else {
 					parameters[i] = null;
 				}
@@ -148,6 +135,8 @@ public class RPC {
 		public void write(DataOutputView out) throws IOException {
 			StringRecord.writeString(out, methodName);
 			out.writeInt(parameterClasses.length);
+
+			// at this point, type conversion should have happened
 			for (int i = 0; i < parameterClasses.length; i++) {
 				StringRecord.writeString(out, parameterClasses[i].getName());
 				if (parameters[i] == null) {
@@ -155,7 +144,39 @@ public class RPC {
 				} else {
 					out.writeBoolean(true);
 					StringRecord.writeString(out, parameters[i].getClass().getName());
-					parameters[i].write(out);
+					((IOReadableWritable) parameters[i]).write(out);
+				}
+			}
+		}
+		
+		public void doTypeConversion() throws IOException {
+			try {
+				for (int i = 0; i < parameterClasses.length; i++) {
+					if (!IOReadableWritable.class.isAssignableFrom(parameterClasses[i])) {
+						try {
+							parameters[i] = JavaToValueConverter.convertBoxedJavaType(parameters[i]);
+						}
+						catch (IllegalArgumentException e) {
+							throw new IOException("Argument " + i + " of method " + methodName
+									+ " is not a primitive type (or boxed primitive) and not of type IOReadableWriteable");
+						}
+					}
+				}
+			}
+			catch (IOException e) {
+				LOG.error(e.getMessage(), e);
+				throw e;
+			}
+			catch (Exception e) {
+				LOG.error(e.getMessage(), e);
+				throw new IOException(e);
+			}
+		}
+		
+		public void undoTypeConversion() {
+			for (int i = 0; i < parameterClasses.length; i++) {
+				if (!IOReadableWritable.class.isAssignableFrom(parameterClasses[i])) {
+					parameters[i] = JavaToValueConverter.convertValueType((Value) parameters[i]);
 				}
 			}
 		}
@@ -180,14 +201,6 @@ public class RPC {
 	static private class ClientCache {
 		private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();
 
-		/**
-		 * Construct & cache an IPC client with the user-provided SocketFactory
-		 * if no cached client exists.
-		 * 
-		 * @param conf
-		 *        Configuration
-		 * @return an IPC client
-		 */
 		private synchronized Client getClient(SocketFactory factory) {
 			// Construct & cache client. The configuration is only used for timeout,
 			// and Clients have connection pools. So we can either (a) lose some
@@ -236,26 +249,17 @@ public class RPC {
 		}
 
 		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-
-			// TODO clean up
-			IOReadableWritable[] castArgs = null;
-			if (args != null) {
-				castArgs = new IOReadableWritable[args.length];
-
-				// Check if args are instances of ReadableWritable
-				for (int i = 0; i < args.length; i++) {
-					if ((args[i] != null) && !(args[i] instanceof IOReadableWritable)) {
-						throw new IOException("Argument " + i + " of method " + method.getName()
-							+ " is not of type IOReadableWriteable");
-					} else {
-						castArgs[i] = (IOReadableWritable) args[i];
-					}
-				}
+			Invocation invocation = new Invocation(method, args);
+			invocation.doTypeConversion();
+			
+			Object retValue = this.client.call(invocation, this.address, method.getDeclaringClass());
+			
+			if (IOReadableWritable.class.isAssignableFrom(method.getReturnType())) {
+				return retValue;
+			}
+			else {
+				return JavaToValueConverter.convertValueType((Value) retValue);
 			}
-			final IOReadableWritable value = this.client.call(new Invocation(method, castArgs), this.address, method
-				.getDeclaringClass());
-
-			return value;
 		}
 
 		/* close the IPC client that's responsible for this invoker's RPCs */
@@ -328,11 +332,6 @@ public class RPC {
 
 	/**
 	 * Construct a client-side proxy object with the default SocketFactory
-	 * 
-	 * @param protocol
-	 * @param addr
-	 * @return
-	 * @throws IOException
 	 */
 	public static <V extends VersionedProtocol> V getProxy(Class<V> protocol, InetSocketAddress addr)
 			throws IOException {
@@ -370,8 +369,6 @@ public class RPC {
 		 * 
 		 * @param instance
 		 *        the instance whose methods will be called
-		 * @param conf
-		 *        the configuration to use
 		 * @param bindAddress
 		 *        the address to bind on to listen for connection
 		 * @param port
@@ -395,8 +392,6 @@ public class RPC {
 		 * 
 		 * @param instance
 		 *        the instance whose methods will be called
-		 * @param conf
-		 *        the configuration to use
 		 * @param bindAddress
 		 *        the address to bind on to listen for connection
 		 * @param port
@@ -415,12 +410,26 @@ public class RPC {
 			try {
 				
 				final Invocation call = (Invocation) param;
+				call.undoTypeConversion();
 				
 				final Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses());
 				method.setAccessible(true);
 
 				final Object value = method.invoke((Object) instance, (Object[]) call.getParameters());
-				return (IOReadableWritable) value;
+
+				if (IOReadableWritable.class.isAssignableFrom(method.getReturnType())) {
+					return (IOReadableWritable) value;
+				}
+				else {
+					try {
+						return JavaToValueConverter.convertBoxedJavaType(value);
+					}
+					catch (IllegalArgumentException e) {
+						throw new IOException("The return type of method " + method.getName()
+								+ " is not a primitive type (or boxed primitive) and not of type IOReadableWriteable");
+					}
+				}
+
 			} catch (InvocationTargetException e) {
 				
 				final Throwable target = e.getTargetException();
@@ -431,7 +440,8 @@ public class RPC {
 					ioe.setStackTrace(target.getStackTrace());
 					throw ioe;
 				}
-			} catch (Throwable e) {
+			}
+			catch (Throwable e) {
 				final IOException ioe = new IOException(e.toString());
 				ioe.setStackTrace(e.getStackTrace());
 				throw ioe;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
index bde6847..3dcb5b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
@@ -67,13 +67,7 @@ import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.util.ClassUtils;
 
-/**
- * An abstract IPC service. IPC calls take a single {@link Writable} as a
- * parameter, and return a {@link Writable} as their value. A service runs on
- * a port and is defined by a parameter class and a value class.
- * 
- * @see Client
- */
+
 public abstract class Server {
 
 	public static final Logger LOG = LoggerFactory.getLogger(Server.class);
@@ -101,12 +95,7 @@ public abstract class Server {
 		return protocol;
 	}
 
-	/**
-	 * Returns the server instance called under or null. May be called under {@link #call(Writable, long)}
-	 * implementations, and under {@link Writable} methods of paramters and return values. Permits applications to
-	 * access
-	 * the server context.
-	 */
+
 	public static Server get() {
 		return SERVER.get();
 	}
@@ -119,7 +108,7 @@ public abstract class Server {
 
 	/**
 	 * Returns the remote side ip address when invoked inside an RPC
-	 * Returns null incase of an error.
+	 * Returns null in case of an error.
 	 */
 	public static InetAddress getRemoteIp() {
 		Call call = CurCall.get();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
index d23d35f..e038d7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.HashMap;
+import java.util.ArrayDeque;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +36,9 @@ import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
 import org.apache.flink.runtime.jobgraph.JobID;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
  * slots.
@@ -66,9 +68,8 @@ public class DefaultScheduler implements InstanceListener {
 //	 */
 //	private final Cache<ResourceId, Instance> ghostCache;
 	
-	
 	/** All tasks pending to be scheduled */
-	private final LinkedBlockingQueue<ScheduledUnit> taskQueue = new LinkedBlockingQueue<ScheduledUnit>();
+	private final Queue<ScheduledUnit> taskQueue = new ArrayDeque<ScheduledUnit>();
 
 	
 	/** The thread that runs the scheduling loop, picking up tasks to be scheduled and scheduling them. */
@@ -91,7 +92,6 @@ public class DefaultScheduler implements InstanceListener {
 	public DefaultScheduler(boolean rejectIfNoResourceAvailable) {
 		this.rejectIfNoResourceAvailable = rejectIfNoResourceAvailable;
 		
-		
 //		this.ghostCache = CacheBuilder.newBuilder()
 //				.initialCapacity(64)	// easy start
 //				.maximumSize(1024)		// retain some history
@@ -144,639 +144,36 @@ public class DefaultScheduler implements InstanceListener {
 		this.schedulerThread.setUncaughtExceptionHandler(handler);
 	}
 
-//	/**
-//	 * Removes the job represented by the given {@link ExecutionGraph} from the scheduler.
-//	 *
-//	 * @param executionGraphToRemove
-//	 *        the job to be removed
-//	 */
-//	void removeJobFromSchedule(final ExecutionGraph executionGraphToRemove) {
-//
-//		boolean removedFromQueue = false;
-//
-//		synchronized (this.jobQueue) {
-//
-//			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-//			while (it.hasNext()) {
-//
-//				final ExecutionGraph executionGraph = it.next();
-//				if (executionGraph.getJobID().equals(executionGraphToRemove.getJobID())) {
-//					removedFromQueue = true;
-//					it.remove();
-//					break;
-//				}
-//			}
-//		}
-//
-//		if (!removedFromQueue) {
-//			LOG.error("Cannot find job " + executionGraphToRemove.getJobName() + " ("
-//					+ executionGraphToRemove.getJobID() + ") to remove");
-//		}
-//	}
-//
-//	/**
-//	 * Adds a job represented by an {@link ExecutionGraph} object to the scheduler. The job is then executed according
-//	 * to the strategies of the concrete scheduler implementation.
-//	 *
-//	 * @param executionGraph
-//	 *        the job to be added to the scheduler
-//	 * @throws SchedulingException
-//	 *         thrown if an error occurs and the scheduler does not accept the new job
-//	 */
-//	public void scheduleJob(final ExecutionGraph executionGraph) throws SchedulingException {
-//
-//		final int requiredSlots = executionGraph.getRequiredSlots();
-//		final int availableSlots = this.getInstanceManager().getNumberOfSlots();
-//
-//		if(requiredSlots > availableSlots){
-//			throw new SchedulingException(String.format(
-//					"Not enough available task slots to run job %s (%s). Required: %d Available: %d . "
-//					+ "Either reduce the parallelism of your program, wait for other programs to finish, or increase "
-//					+ "the number of task slots in the cluster by adding more machines or increasing the number of slots "
-//					+ "per machine in conf/flink-conf.yaml .", 
-//					executionGraph.getJobName(), executionGraph.getJobID(), requiredSlots, availableSlots));
-//		}
-//
-//		// Subscribe to job status notifications
-//		executionGraph.registerJobStatusListener(this);
-//
-//		// Register execution listener for each vertex
-//		final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, true);
-//		while (it2.hasNext()) {
-//
-//			final ExecutionVertex vertex = it2.next();
-//			vertex.registerExecutionListener(new DefaultExecutionListener(this, vertex));
-//		}
-//
-//		// Register the scheduler as an execution stage listener
-//		executionGraph.registerExecutionStageListener(this);
-//
-//		// Add job to the job queue (important to add job to queue before requesting instances)
-//		synchronized (this.jobQueue) {
-//			this.jobQueue.add(executionGraph);
-//		}
-//
-//		// Request resources for the first stage of the job
-//
-//		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-//		try {
-//			requestInstances(executionStage);
-//		} catch (InstanceException e) {
-//			final String exceptionMessage = StringUtils.stringifyException(e);
-//			LOG.error(exceptionMessage);
-//			this.jobQueue.remove(executionGraph);
-//			throw new SchedulingException(exceptionMessage);
-//		}
-//	}
-//
-//	/**
-//	 * Returns the execution graph which is associated with the given job ID.
-//	 *
-//	 * @param jobID
-//	 *        the job ID to search the execution graph for
-//	 * @return the execution graph which belongs to the given job ID or <code>null</code if no such execution graph
-//	 *         exists
-//	 */
-//	public ExecutionGraph getExecutionGraphByID(final JobID jobID) {
-//
-//		synchronized (this.jobQueue) {
-//
-//			final Iterator<ExecutionGraph> it = this.jobQueue.iterator();
-//			while (it.hasNext()) {
-//
-//				final ExecutionGraph executionGraph = it.next();
-//				if (executionGraph.getJobID().equals(jobID)) {
-//					return executionGraph;
-//				}
-//			}
-//		}
-//
-//		return null;
-//	}
-//
-//
-//
-//	public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus,
-//									final String optionalMessage) {
-//
-//		if (newJobStatus == InternalJobStatus.FAILED || newJobStatus == InternalJobStatus.FINISHED
-//				|| newJobStatus == InternalJobStatus.CANCELED) {
-//			removeJobFromSchedule(executionGraph);
-//		}
-//	}
-//
-//	public void nextExecutionStageEntered(final JobID jobID, final ExecutionStage executionStage) {
-//
-//		// Request new instances if necessary
-//		try {
-//			requestInstances(executionStage);
-//		} catch (InstanceException e) {
-//			// TODO: Handle error correctly
-//			LOG.error(StringUtils.stringifyException(e));
-//		}
-//
-//		// Deploy the assigned vertices
-//		deployAssignedInputVertices(executionStage.getExecutionGraph());
-//	}
-//
-//
-//	/**
-//	 * Returns the {@link org.apache.flink.runtime.instance.InstanceManager} object which is used by the current scheduler.
-//	 * 
-//	 * @return the {@link org.apache.flink.runtime.instance.InstanceManager} object which is used by the current scheduler
-//	 */
-//	public InstanceManager getInstanceManager() {
-//		return this.instanceManager;
-//	}
-//
-//
-//	/**
-//	 * Collects the instances required to run the job from the given {@link ExecutionStage} and requests them at the
-//	 * loaded instance manager.
-//	 * 
-//	 * @param executionStage
-//	 *        the execution stage to collect the required instances from
-//	 * @throws InstanceException
-//	 *         thrown if the given execution graph is already processing its final stage
-//	 */
-//	protected void requestInstances(final ExecutionStage executionStage) throws InstanceException {
-//
-//		final ExecutionGraph executionGraph = executionStage.getExecutionGraph();
-//
-//		synchronized (executionStage) {
-//
-//			final int requiredSlots = executionStage.getRequiredSlots();
-//
-//			LOG.info("Requesting " + requiredSlots + " slots for job " + executionGraph.getJobID());
-//
-//			this.instanceManager.requestInstance(executionGraph.getJobID(), executionGraph.getJobConfiguration(),
-//				requiredSlots);
-//
-//			// Switch vertex state to assigning
-//			final ExecutionGraphIterator it2 = new ExecutionGraphIterator(executionGraph, executionGraph
-//				.getIndexOfCurrentExecutionStage(), true, true);
-//			while (it2.hasNext()) {
-//
-//				it2.next().compareAndUpdateExecutionState(ExecutionState.CREATED, ExecutionState.SCHEDULED);
-//			}
-//		}
-//	}
-//
-//	void findVerticesToBeDeployed(final ExecutionVertex vertex,
-//			final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed,
-//			final Set<ExecutionVertex> alreadyVisited) {
-//
-//		if (!alreadyVisited.add(vertex)) {
-//			return;
-//		}
-//
-//		if (vertex.compareAndUpdateExecutionState(ExecutionState.ASSIGNED, ExecutionState.READY)) {
-//			final Instance instance = vertex.getAllocatedResource().getInstance();
-//
-//			if (instance instanceof DummyInstance) {
-//				LOG.error("Inconsistency: Vertex " + vertex + " is about to be deployed on a DummyInstance");
-//			}
-//
-//			List<ExecutionVertex> verticesForInstance = verticesToBeDeployed.get(instance);
-//			if (verticesForInstance == null) {
-//				verticesForInstance = new ArrayList<ExecutionVertex>();
-//				verticesToBeDeployed.put(instance, verticesForInstance);
-//			}
-//
-//			verticesForInstance.add(vertex);
-//		}
-//
-//		final int numberOfOutputGates = vertex.getNumberOfOutputGates();
-//		for (int i = 0; i < numberOfOutputGates; ++i) {
-//
-//			final ExecutionGate outputGate = vertex.getOutputGate(i);
-//			boolean deployTarget;
-//
-//			switch (outputGate.getChannelType()) {
-//			case NETWORK:
-//				deployTarget = false;
-//				break;
-//			case IN_MEMORY:
-//				deployTarget = true;
-//				break;
-//			default:
-//				throw new IllegalStateException("Unknown channel type");
-//			}
-//
-//			if (deployTarget) {
-//
-//				final int numberOfOutputChannels = outputGate.getNumberOfEdges();
-//				for (int j = 0; j < numberOfOutputChannels; ++j) {
-//					final ExecutionEdge outputChannel = outputGate.getEdge(j);
-//					final ExecutionVertex connectedVertex = outputChannel.getInputGate().getVertex();
-//					findVerticesToBeDeployed(connectedVertex, verticesToBeDeployed, alreadyVisited);
-//				}
-//			}
-//		}
-//	}
-//
-//	/**
-//	 * Collects all execution vertices with the state ASSIGNED starting from the given start vertex and
-//	 * deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-//	 * 
-//	 * @param startVertex
-//	 *        the execution vertex to start the deployment from
-//	 */
-//	public void deployAssignedVertices(final ExecutionVertex startVertex) {
-//
-//		final JobID jobID = startVertex.getExecutionGraph().getJobID();
-//
-//		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-//		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-//
-//		findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
-//
-//		if (!verticesToBeDeployed.isEmpty()) {
-//
-//			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-//				.entrySet()
-//				.iterator();
-//
-//			while (it2.hasNext()) {
-//
-//				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-//				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-//			}
-//		}
-//	}
-//
-//	/**
-//	 * Collects all execution vertices with the state ASSIGNED from the given pipeline and deploys them on the assigned
-//	 * {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-//	 * 
-//	 * @param pipeline
-//	 *        the execution pipeline to be deployed
-//	 */
-//	public void deployAssignedPipeline(final ExecutionPipeline pipeline) {
-//
-//		final JobID jobID = null;
-//
-//		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-//		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-//
-//		final Iterator<ExecutionVertex> it = pipeline.iterator();
-//		while (it.hasNext()) {
-//			findVerticesToBeDeployed(it.next(), verticesToBeDeployed, alreadyVisited);
-//		}
-//
-//		if (!verticesToBeDeployed.isEmpty()) {
-//
-//			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-//				.entrySet()
-//				.iterator();
-//
-//			while (it2.hasNext()) {
-//
-//				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-//				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-//			}
-//		}
-//	}
-//
-//	/**
-//	 * Collects all execution vertices with the state ASSIGNED starting from the given collection of start vertices and
-//	 * deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-//	 * 
-//	 * @param startVertices
-//	 *        the collection of execution vertices to start the deployment from
-//	 */
-//	public void deployAssignedVertices(final Collection<ExecutionVertex> startVertices) {
-//
-//		JobID jobID = null;
-//
-//		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-//		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-//
-//		for (final ExecutionVertex startVertex : startVertices) {
-//
-//			if (jobID == null) {
-//				jobID = startVertex.getExecutionGraph().getJobID();
-//			}
-//
-//			findVerticesToBeDeployed(startVertex, verticesToBeDeployed, alreadyVisited);
-//		}
-//
-//		if (!verticesToBeDeployed.isEmpty()) {
-//
-//			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-//				.entrySet()
-//				.iterator();
-//
-//			while (it2.hasNext()) {
-//
-//				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-//				this.deploymentManager.deploy(jobID, entry.getKey(), entry.getValue());
-//			}
-//		}
-//	}
-//
-//	/**
-//	 * Collects all execution vertices with the state ASSIGNED starting from the input vertices of the current execution
-//	 * stage and deploys them on the assigned {@link org.apache.flink.runtime.instance.AllocatedResource} objects.
-//	 * 
-//	 * @param executionGraph
-//	 *        the execution graph to collect the vertices from
-//	 */
-//	public void deployAssignedInputVertices(final ExecutionGraph executionGraph) {
-//
-//		final Map<Instance, List<ExecutionVertex>> verticesToBeDeployed = new HashMap<Instance, List<ExecutionVertex>>();
-//		final ExecutionStage executionStage = executionGraph.getCurrentExecutionStage();
-//
-//		final Set<ExecutionVertex> alreadyVisited = new HashSet<ExecutionVertex>();
-//
-//		for (int i = 0; i < executionStage.getNumberOfStageMembers(); ++i) {
-//
-//			final ExecutionGroupVertex startVertex = executionStage.getStageMember(i);
-//			if (!startVertex.isInputVertex()) {
-//				continue;
-//			}
-//
-//			for (int j = 0; j < startVertex.getCurrentNumberOfGroupMembers(); ++j) {
-//				final ExecutionVertex vertex = startVertex.getGroupMember(j);
-//				findVerticesToBeDeployed(vertex, verticesToBeDeployed, alreadyVisited);
-//			}
-//		}
-//
-//		if (!verticesToBeDeployed.isEmpty()) {
-//
-//			final Iterator<Map.Entry<Instance, List<ExecutionVertex>>> it2 = verticesToBeDeployed
-//				.entrySet()
-//				.iterator();
-//
-//			while (it2.hasNext()) {
-//
-//				final Map.Entry<Instance, List<ExecutionVertex>> entry = it2.next();
-//				this.deploymentManager.deploy(executionGraph.getJobID(), entry.getKey(), entry.getValue());
-//			}
-//		}
-//	}
-//
-//
-//	@Override
-//	public void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-//
-//		if (allocatedResources == null) {
-//			LOG.error("Resource to lock is null!");
-//			return;
-//		}
-//
-//		for (final AllocatedResource allocatedResource : allocatedResources) {
-//			if (allocatedResource.getInstance() instanceof DummyInstance) {
-//				LOG.debug("Available instance is of type DummyInstance!");
-//				return;
-//			}
-//		}
-//
-//		final ExecutionGraph eg = getExecutionGraphByID(jobID);
-//
-//		if (eg == null) {
-//			/*
-//			 * The job have have been canceled in the meantime, in this case
-//			 * we release the instance immediately.
-//			 */
-//			try {
-//				for (final AllocatedResource allocatedResource : allocatedResources) {
-//					getInstanceManager().releaseAllocatedResource(allocatedResource);
-//				}
-//			} catch (InstanceException e) {
-//				LOG.error(e);
-//			}
-//			return;
-//		}
-//
-//		final Runnable command = new Runnable() {
-//
-//			/**
-//			 * {@inheritDoc}
-//			 */
-//			@Override
-//			public void run() {
-//
-//				final ExecutionStage stage = eg.getCurrentExecutionStage();
-//
-//				synchronized (stage) {
-//
-//					for (final AllocatedResource allocatedResource : allocatedResources) {
-//
-//						AllocatedResource resourceToBeReplaced = null;
-//						// Important: only look for instances to be replaced in the current stage
-//						final Iterator<ExecutionGroupVertex> groupIterator = new ExecutionGroupVertexIterator(eg, true,
-//							stage.getStageNumber());
-//						while (groupIterator.hasNext()) {
-//
-//							final ExecutionGroupVertex groupVertex = groupIterator.next();
-//							for (int i = 0; i < groupVertex.getCurrentNumberOfGroupMembers(); ++i) {
-//
-//								final ExecutionVertex vertex = groupVertex.getGroupMember(i);
-//
-//								if (vertex.getExecutionState() == ExecutionState.SCHEDULED
-//									&& vertex.getAllocatedResource() != null) {
-//										resourceToBeReplaced = vertex.getAllocatedResource();
-//										break;
-//								}
-//							}
-//
-//							if (resourceToBeReplaced != null) {
-//								break;
-//							}
-//						}
-//
-//						// For some reason, we don't need this instance
-//						if (resourceToBeReplaced == null) {
-//							LOG.error("Instance " + allocatedResource.getInstance() + " is not required for job"
-//								+ eg.getJobID());
-//							try {
-//								getInstanceManager().releaseAllocatedResource(allocatedResource);
-//							} catch (InstanceException e) {
-//								LOG.error(e);
-//							}
-//							return;
-//						}
-//
-//						// Replace the selected instance
-//						final Iterator<ExecutionVertex> it = resourceToBeReplaced.assignedVertices();
-//						while (it.hasNext()) {
-//							final ExecutionVertex vertex = it.next();
-//							vertex.setAllocatedResource(allocatedResource);
-//							vertex.updateExecutionState(ExecutionState.ASSIGNED);
-//						}
-//					}
-//				}
-//
-//				// Deploy the assigned vertices
-//				deployAssignedInputVertices(eg);
-//
-//			}
-//
-//		};
-//
-//		eg.executeCommand(command);
-//	}
-//
-//	/**
-//	 * Checks if the given {@link AllocatedResource} is still required for the
-//	 * execution of the given execution graph. If the resource is no longer
-//	 * assigned to a vertex that is either currently running or about to run
-//	 * the given resource is returned to the instance manager for deallocation.
-//	 * 
-//	 * @param executionGraph
-//	 *        the execution graph the provided resource has been used for so far
-//	 * @param allocatedResource
-//	 *        the allocated resource to check the assignment for
-//	 */
-//	public void checkAndReleaseAllocatedResource(final ExecutionGraph executionGraph,
-//			final AllocatedResource allocatedResource) {
-//
-//		if (allocatedResource == null) {
-//			LOG.error("Resource to lock is null!");
-//			return;
-//		}
-//
-//		if (allocatedResource.getInstance() instanceof DummyInstance) {
-//			LOG.debug("Available instance is of type DummyInstance!");
-//			return;
-//		}
-//
-//		boolean resourceCanBeReleased = true;
-//		final Iterator<ExecutionVertex> it = allocatedResource.assignedVertices();
-//		while (it.hasNext()) {
-//			final ExecutionVertex vertex = it.next();
-//			final ExecutionState state = vertex.getExecutionState();
-//
-//			if (state != ExecutionState.CREATED && state != ExecutionState.FINISHED
-//				&& state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
-//
-//				resourceCanBeReleased = false;
-//				break;
-//			}
-//		}
-//
-//		if (resourceCanBeReleased) {
-//
-//			LOG.info("Releasing instance " + allocatedResource.getInstance());
-//			try {
-//				getInstanceManager().releaseAllocatedResource(allocatedResource);
-//			} catch (InstanceException e) {
-//				LOG.error(StringUtils.stringifyException(e));
-//			}
-//		}
-//	}
-//
-//	DeploymentManager getDeploymentManager() {
-//		return this.deploymentManager;
-//	}
-//
-//
-//
-//	@Override
-//	public void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
-//
-//		final ExecutionGraph eg = getExecutionGraphByID(jobID);
-//
-//		if (eg == null) {
-//			LOG.error("Cannot find execution graph for job with ID " + jobID);
-//			return;
-//		}
-//
-//		final Runnable command = new Runnable() {
-//
-//			/**
-//			 * {@inheritDoc}
-//			 */
-//			@Override
-//			public void run() {
-//
-//				synchronized (eg) {
-//
-//					for (final AllocatedResource allocatedResource : allocatedResources) {
-//
-//						LOG.info("Resource " + allocatedResource.getInstance().getName() + " for Job " + jobID
-//							+ " died.");
-//
-//						final ExecutionGraph executionGraph = getExecutionGraphByID(jobID);
-//
-//						if (executionGraph == null) {
-//							LOG.error("Cannot find execution graph for job " + jobID);
-//							return;
-//						}
-//
-//						Iterator<ExecutionVertex> vertexIter = allocatedResource.assignedVertices();
-//
-//						// Assign vertices back to a dummy resource.
-//						final DummyInstance dummyInstance = DummyInstance.createDummyInstance();
-//						final AllocatedResource dummyResource = new AllocatedResource(dummyInstance,
-//								new AllocationID());
-//
-//						while (vertexIter.hasNext()) {
-//							final ExecutionVertex vertex = vertexIter.next();
-//							vertex.setAllocatedResource(dummyResource);
-//						}
-//
-//						final String failureMessage = allocatedResource.getInstance().getName() + " died";
-//
-//						vertexIter = allocatedResource.assignedVertices();
-//
-//						while (vertexIter.hasNext()) {
-//							final ExecutionVertex vertex = vertexIter.next();
-//							final ExecutionState state = vertex.getExecutionState();
-//
-//							switch (state) {
-//							case ASSIGNED:
-//							case READY:
-//							case STARTING:
-//							case RUNNING:
-//							case FINISHING:
-//
-//							vertex.updateExecutionState(ExecutionState.FAILED, failureMessage);
-//
-//							break;
-//						default:
-//							}
-//					}
-//
-//					// TODO: Fix this
-//					/*
-//					 * try {
-//					 * requestInstances(this.executionVertex.getGroupVertex().getExecutionStage());
-//					 * } catch (InstanceException e) {
-//					 * e.printStackTrace();
-//					 * // TODO: Cancel the entire job in this case
-//					 * }
-//					 */
-//				}
-//			}
-//
-//			final InternalJobStatus js = eg.getJobStatus();
-//			if (js != InternalJobStatus.FAILING && js != InternalJobStatus.FAILED) {
-//
-//				// TODO: Fix this
-//				// deployAssignedVertices(eg);
-//
-//				final ExecutionStage stage = eg.getCurrentExecutionStage();
-//
-//				try {
-//					requestInstances(stage);
-//				} catch (InstanceException e) {
-//					e.printStackTrace();
-//					// TODO: Cancel the entire job in this case
-//				}
-//			}
-//		}
-//		};
-//
-//		eg.executeCommand(command);
-//	}
 	
 	// --------------------------------------------------------------------------------------------
-	//  Canceling
+	//  Scheduling
 	// --------------------------------------------------------------------------------------------
 	
-	public void removeAllTasksForJob(JobID job) {
+	/**
+	 * @param task
+	 * @param queueIfNoResource If true, this call will queue the request if no resource is immediately
+	 *                          available. If false, it will throw a {@link NoResourceAvailableException}
+	 *                          if no resource is immediately available.
+	 */
+	public void scheduleTask(ScheduledUnit task, boolean queueIfNoResource) {
+		if (task == null) {
+			throw new IllegalArgumentException();
+		}
+		
+		// if there is already a slot for that resource
+		AllocatedSlot existing = this.allocatedSlots.get(task.getSharedResourceId());
+		if (existing != null) {
+			// try to attach to the existing slot
+			if (existing.runTask(task.getTaskVertex())) {
+				// all good, we are done
+				return;
+			}
+			// else: the slot was deallocated, we need to proceed as if there was none
+		}
+		
+		// check if there is a slot that has an available sub-slot for that group-vertex
+		// TODO
+		
 		
 	}
 
@@ -836,101 +233,114 @@ public class DefaultScheduler implements InstanceListener {
 	//  Scheduling
 	// --------------------------------------------------------------------------------------------
 	
-	/**
-	 * Schedules the given unit to an available resource. This call blocks if no resource
-	 * is currently available
-	 * 
-	 * @param unit The unit to be scheduled.
-	 */
-	protected void scheduleNextUnit(ScheduledUnit unit) {
-		if (unit == null) {
-			throw new IllegalArgumentException("Unit to schedule must not be null.");
-		}
-		
-		// see if the resource Id has already an assigned resource
-		AllocatedSlot resource = this.allocatedSlots.get(unit.getSharedResourceId());
-		
-		if (resource == null) {
-			// not yet allocated. find a slot to schedule to
-			try {
-				resource = getResourceToScheduleUnit(unit, this.rejectIfNoResourceAvailable);
-				if (resource == null) {
-					throw new RuntimeException("Error: The resource to schedule to is null.");
-				}
-			}
-			catch (Exception e) {
-				// we cannot go on, the task needs to know what to do now.
-				unit.getTaskVertex().handleException(e);
-				return;
-			}
-		}
-		
-		resource.runTask(unit.getTaskVertex());
-	}
+//	/**
+//	 * Schedules the given unit to an available resource. This call blocks if no resource
+//	 * is currently available
+//	 * 
+//	 * @param unit The unit to be scheduled.
+//	 */
+//	protected void scheduleQueuedUnit(ScheduledUnit unit) {
+//		if (unit == null) {
+//			throw new IllegalArgumentException("Unit to schedule must not be null.");
+//		}
+//		
+//		// see if the resource Id has already an assigned resource
+//		AllocatedSlot resource = this.allocatedSlots.get(unit.getSharedResourceId());
+//		
+//		if (resource == null) {
+//			// not yet allocated. find a slot to schedule to
+//			try {
+//				resource = getResourceToScheduleUnit(unit, this.rejectIfNoResourceAvailable);
+//				if (resource == null) {
+//					throw new RuntimeException("Error: The resource to schedule to is null.");
+//				}
+//			}
+//			catch (Exception e) {
+//				// we cannot go on, the task needs to know what to do now.
+//				unit.getTaskVertex().handleException(e);
+//				return;
+//			}
+//		}
+//		
+//		resource.runTask(unit.getTaskVertex());
+//	}
 	
 	/**
 	 * Acquires a resource to schedule the given unit to. This call may block if no
 	 * resource is currently available, or throw an exception, based on the given flag.
 	 * 
 	 * @param unit The unit to find a resource for.
-	 * @param exceptionOnNoAvailability If true, this call should not block is no resource is available,
-	 *                                  but throw a {@link NoResourceAvailableException}.
 	 * @return The resource to schedule the execution of the given unit on.
 	 * 
 	 * @throws NoResourceAvailableException If the {@code exceptionOnNoAvailability} flag is true and the scheduler
 	 *                                      has currently no resources available.
 	 */
-	protected AllocatedSlot getResourceToScheduleUnit(ScheduledUnit unit, boolean exceptionOnNoAvailability) 
+	protected AllocatedSlot getNewSlotForTask(ScheduledUnit unit, boolean queueIfNoResource) 
 		throws NoResourceAvailableException
 	{
-		AllocatedSlot slot = null;
-		
-		while (true) {
-			synchronized (this.lock) {
-				Instance instanceToUse = this.instancesWithAvailableResources.poll();
-				
-				// if there is nothing, throw an exception or wait, depending on what is configured
-				if (instanceToUse == null) {
-					if (exceptionOnNoAvailability) {
-						throw new NoResourceAvailableException(unit);
+		synchronized (this.lock) {
+			Instance instanceToUse = this.instancesWithAvailableResources.poll();
+			
+			// if there is nothing, throw an exception or wait, depending on what is configured
+			if (instanceToUse != null) {
+				try {
+					AllocatedSlot slot = instanceToUse.allocateSlot(unit.getJobId(), unit.getSharedResourceId());
+					
+					// if the instance has further available slots, re-add it to the set of available resources.
+					if (instanceToUse.hasResourcesAvailable()) {
+						this.instancesWithAvailableResources.add(instanceToUse);
 					}
-					else {
-						try {
-							do {
-								this.lock.wait(2000);
-							}
-							while (!shutdown.get() && 
-									(instanceToUse = this.instancesWithAvailableResources.poll()) == null);
-						}
-						catch (InterruptedException e) {
-							throw new NoResourceAvailableException("The scheduler was interrupted.");
+					
+					if (slot != null) {
+						AllocatedSlot previous = this.allocatedSlots.putIfAbsent(unit.getSharedResourceId(), slot);
+						if (previous != null) {
+							// concurrently, someone allocated a slot for that ID
+							// release the new one
+							slot.cancelResource();
+							slot = previous;
 						}
 					}
-				}
-				
-				// at this point, we have an instance. request a slot from the instance
-				try {
-					slot = instanceToUse.allocateSlot(unit.getJobId(), unit.getSharedResourceId());
+					// else fall through the loop
 				}
 				catch (InstanceDiedException e) {
 					// the instance died it has not yet been propagated to this scheduler
 					// remove the instance from the set of available instances
 					this.allInstances.remove(instanceToUse);
 				}
+			}
+				
+			
+			if (queueIfNoResource) {
+				this.taskQueue.add(unit);
+			}
+			else {
+				throw new NoResourceAvailableException(unit);
+			}
+				// at this point, we have an instance. request a slot from the instance
+				
 				
 				// if the instance has further available slots, re-add it to the set of available
 				// resources.
-				// if it does not, but asynchronously 
+				// if it does not, but asynchronously a slot became available, we may attempt to add the
+				// instance twice, which does not matter because of the set semantics of the "instancesWithAvailableResources"
 				if (instanceToUse.hasResourcesAvailable()) {
 					this.instancesWithAvailableResources.add(instanceToUse);
 				}
 				
 				if (slot != null) {
-					return slot;
+					AllocatedSlot previous = this.allocatedSlots.putIfAbsent(unit.getSharedResourceId(), slot);
+					if (previous != null) {
+						// concurrently, someone allocated a slot for that ID
+						// release the new one
+						slot.cancelResource();
+						slot = previous;
+					}
 				}
 				// else fall through the loop
 			}
 		}
+		
+		return slot;
 	}
 	
 	protected void runSchedulerLoop() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
index 8caf64a..338529f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduledUnit.java
@@ -15,6 +15,8 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
 import org.apache.flink.runtime.jobgraph.JobID;
 
@@ -26,6 +28,8 @@ public class ScheduledUnit {
 	
 	private final ResourceId resourceId;
 	
+	private final AtomicBoolean scheduled = new AtomicBoolean(false);
+	
 	
 	public ScheduledUnit(JobID jobId, ExecutionVertex2 taskVertex) {
 		this(jobId, taskVertex, new ResourceId());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index 02c814c..b047222 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -251,5 +251,15 @@ public class LocalInstanceManagerTest {
 
 		@Override
 		public void deploy(JobID jobID, Instance instance, List<ExecutionVertex> verticesToBeDeployed) {}
+
+		@Override
+		public boolean sendHeartbeat(InstanceID taskManagerId) {
+			return true;
+		}
+
+		@Override
+		public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
+			return new InstanceID();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa7550aa/flink-runtime/src/test/java/org/apache/flink/runtime/ipc/RpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/ipc/RpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/ipc/RpcTest.java
new file mode 100644
index 0000000..d45b131
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/ipc/RpcTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.ipc;
+
+import static org.junit.Assert.*;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import org.apache.flink.core.protocols.VersionedProtocol;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.LogUtils;
+
+
+public class RpcTest {
+
+	@BeforeClass
+	public static void initLogger() {
+		LogUtils.initializeDefaultConsoleLogger();
+	}
+	
+	
+	@Test
+	public void testRpc() {
+		try { 
+			Server server = null;
+			TestProtocol proxy = null;
+			
+			try {
+				// setup the RPCs
+				int port = getAvailablePort();
+				server = RPC.getServer(new TestProtocolImpl(), "localhost", port, 4);
+				server.start();
+				
+				proxy = RPC.getProxy(TestProtocol.class, new InetSocketAddress("localhost", port), NetUtils.getSocketFactory());
+				
+				// make a few calls with various types
+//				proxy.methodWithNoParameters();
+				
+				assertEquals(19, proxy.methodWithPrimitives(16, new StringValue("abc")));
+				assertEquals(new DoubleValue(17.0), proxy.methodWithWritables(new LongValue(17)));
+			}
+			finally {
+				if (proxy != null) {
+					RPC.stopProxy(proxy);
+				}
+				if (server != null) {
+					server.stop();
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	private static final int getAvailablePort() throws IOException {
+		ServerSocket serverSocket = null;
+		for (int i = 0; i < 50; i++){
+			try {
+				serverSocket = new ServerSocket(0);
+				int port = serverSocket.getLocalPort();
+				if (port != 0) {
+					return port;
+				}
+			}
+			catch (IOException e) {}
+			finally {
+				if (serverSocket != null) {
+					serverSocket.close();
+				}
+			}
+		}
+		
+		throw new IOException("Could not find a free port.");
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static interface TestProtocol extends VersionedProtocol {
+		
+		public void methodWithNoParameters();
+		
+		public int methodWithPrimitives(int intParam, StringValue writableParam);
+		
+		public DoubleValue methodWithWritables(LongValue writableParam);
+	}
+	
+	
+	public static final class TestProtocolImpl implements TestProtocol {
+
+		@Override
+		public void methodWithNoParameters() {}
+
+		@Override
+		public int methodWithPrimitives(int intParam, StringValue writableParam) {
+			return intParam + writableParam.length();
+		}
+
+		@Override
+		public DoubleValue methodWithWritables(LongValue writableParam) {
+			return new DoubleValue(writableParam.getValue());
+		}
+	}
+}