You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/21 15:47:44 UTC

[08/10] flink git commit: [FLINK-377] Generic Interface

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
new file mode 100644
index 0000000..d45b019
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common.streaming;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
+import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
+
+/**
+ * General-purpose class to write data to memory-mapped files.
+ */
+public class Sender implements Serializable {
+	public static final byte TYPE_TUPLE = (byte) 11;
+	public static final byte TYPE_BOOLEAN = (byte) 10;
+	public static final byte TYPE_BYTE = (byte) 9;
+	public static final byte TYPE_SHORT = (byte) 8;
+	public static final byte TYPE_INTEGER = (byte) 7;
+	public static final byte TYPE_LONG = (byte) 6;
+	public static final byte TYPE_DOUBLE = (byte) 4;
+	public static final byte TYPE_FLOAT = (byte) 5;
+	public static final byte TYPE_CHAR = (byte) 3;
+	public static final byte TYPE_STRING = (byte) 2;
+	public static final byte TYPE_BYTES = (byte) 1;
+	public static final byte TYPE_NULL = (byte) 0;
+
+	private final AbstractRichFunction function;
+
+	private File outputFile;
+	private RandomAccessFile outputRAF;
+	private FileChannel outputChannel;
+	private MappedByteBuffer fileBuffer;
+
+	private final ByteBuffer[] saved = new ByteBuffer[2];
+
+	private final Serializer[] serializer = new Serializer[2];
+
+	public Sender(AbstractRichFunction function) {
+		this.function = function;
+	}
+
+	//=====Setup========================================================================================================
+	public void open(String path) throws IOException {
+		setupMappedFile(path);
+	}
+
+	private void setupMappedFile(String path) throws FileNotFoundException, IOException {
+		String outputFilePath = function == null
+				? FLINK_TMP_DATA_DIR + "/" + "input"
+				: path;
+
+		File x = new File(FLINK_TMP_DATA_DIR);
+		x.mkdirs();
+
+		outputFile = new File(outputFilePath);
+		if (outputFile.exists()) {
+			outputFile.delete();
+		}
+		outputFile.createNewFile();
+		outputRAF = new RandomAccessFile(outputFilePath, "rw");
+		outputRAF.setLength(MAPPED_FILE_SIZE);
+		outputRAF.seek(MAPPED_FILE_SIZE - 1);
+		outputRAF.writeByte(0);
+		outputRAF.seek(0);
+		outputChannel = outputRAF.getChannel();
+		fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
+	}
+
+	public void close() throws IOException {
+		closeMappedFile();
+	}
+
+	private void closeMappedFile() throws IOException {
+		outputChannel.close();
+		outputRAF.close();
+	}
+
+	/**
+	 * Resets this object to the post-configuration state.
+	 */
+	public void reset() {
+		serializer[0] = null;
+		serializer[1] = null;
+		fileBuffer.clear();
+	}
+
+	//=====Serialization================================================================================================
+	/**
+	 * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user
+	 * must guarantee that the file may be written to before calling this method. This method essentially reserves the
+	 * whole buffer for one record. As such it imposes some performance restrictions and should only be used when
+	 * absolutely necessary.
+	 *
+	 * @param value record to send
+	 * @return size of the written buffer
+	 * @throws IOException
+	 */
+	public int sendRecord(Object value) throws IOException {
+		fileBuffer.clear();
+		int group = 0;
+
+		serializer[group] = getSerializer(value);
+		ByteBuffer bb = serializer[group].serialize(value);
+		if (bb.remaining() > MAPPED_FILE_SIZE) {
+			throw new RuntimeException("Serialized object does not fit into a single buffer.");
+		}
+		fileBuffer.put(bb);
+
+		int size = fileBuffer.position();
+
+		reset();
+		return size;
+	}
+
+	public boolean hasRemaining(int group) {
+		return saved[group] != null;
+	}
+
+	/**
+	 * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
+	 * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
+	 * guarantee that the file may be written to before calling this method.
+	 *
+	 * @param i iterator containing records
+	 * @param group group to which the iterator belongs, most notably used by CoGroup-functions.
+	 * @return size of the written buffer
+	 * @throws IOException
+	 */
+	public int sendBuffer(Iterator i, int group) throws IOException {
+		fileBuffer.clear();
+
+		Object value;
+		ByteBuffer bb;
+		if (serializer[group] == null) {
+			value = i.next();
+			serializer[group] = getSerializer(value);
+			bb = serializer[group].serialize(value);
+			if (bb.remaining() > MAPPED_FILE_SIZE) {
+				throw new RuntimeException("Serialized object does not fit into a single buffer.");
+			}
+			fileBuffer.put(bb);
+
+		}
+		if (saved[group] != null) {
+			fileBuffer.put(saved[group]);
+			saved[group] = null;
+		}
+		while (i.hasNext() && saved[group] == null) {
+			value = i.next();
+			bb = serializer[group].serialize(value);
+			if (bb.remaining() > MAPPED_FILE_SIZE) {
+				throw new RuntimeException("Serialized object does not fit into a single buffer.");
+			}
+			if (bb.remaining() <= fileBuffer.remaining()) {
+				fileBuffer.put(bb);
+			} else {
+				saved[group] = bb;
+			}
+		}
+
+		int size = fileBuffer.position();
+		return size;
+	}
+
+	private enum SupportedTypes {
+		TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL
+	}
+
+	//=====Serializer===================================================================================================
+	private Serializer getSerializer(Object value) throws IOException {
+		String className = value.getClass().getSimpleName().toUpperCase();
+		if (className.startsWith("TUPLE")) {
+			className = "TUPLE";
+		}
+		if (className.startsWith("BYTE[]")) {
+			className = "BYTES";
+		}
+		SupportedTypes type = SupportedTypes.valueOf(className);
+		switch (type) {
+			case TUPLE:
+				fileBuffer.put(TYPE_TUPLE);
+				fileBuffer.putInt(((Tuple) value).getArity());
+				return new TupleSerializer((Tuple) value);
+			case BOOLEAN:
+				fileBuffer.put(TYPE_BOOLEAN);
+				return new BooleanSerializer();
+			case BYTE:
+				fileBuffer.put(TYPE_BYTE);
+				return new ByteSerializer();
+			case BYTES:
+				fileBuffer.put(TYPE_BYTES);
+				return new BytesSerializer();
+			case CHARACTER:
+				fileBuffer.put(TYPE_CHAR);
+				return new CharSerializer();
+			case SHORT:
+				fileBuffer.put(TYPE_SHORT);
+				return new ShortSerializer();
+			case INTEGER:
+				fileBuffer.put(TYPE_INTEGER);
+				return new IntSerializer();
+			case LONG:
+				fileBuffer.put(TYPE_LONG);
+				return new LongSerializer();
+			case STRING:
+				fileBuffer.put(TYPE_STRING);
+				return new StringSerializer();
+			case FLOAT:
+				fileBuffer.put(TYPE_FLOAT);
+				return new FloatSerializer();
+			case DOUBLE:
+				fileBuffer.put(TYPE_DOUBLE);
+				return new DoubleSerializer();
+			case NULL:
+				fileBuffer.put(TYPE_NULL);
+				return new NullSerializer();
+			default:
+				throw new IllegalArgumentException("Unknown Type encountered: " + type);
+		}
+	}
+
+	private abstract class Serializer<T> {
+		protected ByteBuffer buffer;
+
+		public Serializer(int capacity) {
+			buffer = ByteBuffer.allocate(capacity);
+		}
+
+		public ByteBuffer serialize(T value) {
+			buffer.clear();
+			serializeInternal(value);
+			buffer.flip();
+			return buffer;
+		}
+
+		public abstract void serializeInternal(T value);
+	}
+
+	private class ByteSerializer extends Serializer<Byte> {
+		public ByteSerializer() {
+			super(1);
+		}
+
+		@Override
+		public void serializeInternal(Byte value) {
+			buffer.put(value);
+		}
+	}
+
+	private class BooleanSerializer extends Serializer<Boolean> {
+		public BooleanSerializer() {
+			super(1);
+		}
+
+		@Override
+		public void serializeInternal(Boolean value) {
+			buffer.put(value ? (byte) 1 : (byte) 0);
+		}
+	}
+
+	private class CharSerializer extends Serializer<Character> {
+		public CharSerializer() {
+			super(4);
+		}
+
+		@Override
+		public void serializeInternal(Character value) {
+			buffer.put((value + "").getBytes());
+		}
+	}
+
+	private class ShortSerializer extends Serializer<Short> {
+		public ShortSerializer() {
+			super(2);
+		}
+
+		@Override
+		public void serializeInternal(Short value) {
+			buffer.putShort(value);
+		}
+	}
+
+	private class IntSerializer extends Serializer<Integer> {
+		public IntSerializer() {
+			super(4);
+		}
+
+		@Override
+		public void serializeInternal(Integer value) {
+			buffer.putInt(value);
+		}
+	}
+
+	private class LongSerializer extends Serializer<Long> {
+		public LongSerializer() {
+			super(8);
+		}
+
+		@Override
+		public void serializeInternal(Long value) {
+			buffer.putLong(value);
+		}
+	}
+
+	private class StringSerializer extends Serializer<String> {
+		public StringSerializer() {
+			super(0);
+		}
+
+		@Override
+		public void serializeInternal(String value) {
+			byte[] bytes = value.getBytes();
+			buffer = ByteBuffer.allocate(bytes.length + 4);
+			buffer.putInt(bytes.length);
+			buffer.put(bytes);
+		}
+	}
+
+	private class FloatSerializer extends Serializer<Float> {
+		public FloatSerializer() {
+			super(4);
+		}
+
+		@Override
+		public void serializeInternal(Float value) {
+			buffer.putFloat(value);
+		}
+	}
+
+	private class DoubleSerializer extends Serializer<Double> {
+		public DoubleSerializer() {
+			super(8);
+		}
+
+		@Override
+		public void serializeInternal(Double value) {
+			buffer.putDouble(value);
+		}
+	}
+
+	private class NullSerializer extends Serializer<Object> {
+		public NullSerializer() {
+			super(0);
+		}
+
+		@Override
+		public void serializeInternal(Object value) {
+		}
+	}
+
+	private class BytesSerializer extends Serializer<byte[]> {
+		public BytesSerializer() {
+			super(0);
+		}
+
+		@Override
+		public void serializeInternal(byte[] value) {
+			buffer = ByteBuffer.allocate(4 + value.length);
+			buffer.putInt(value.length);
+			buffer.put(value);
+		}
+	}
+
+	private class TupleSerializer extends Serializer<Tuple> {
+		private final Serializer[] serializer;
+		private final List<ByteBuffer> buffers;
+
+		public TupleSerializer(Tuple value) throws IOException {
+			super(0);
+			serializer = new Serializer[value.getArity()];
+			buffers = new ArrayList();
+			for (int x = 0; x < serializer.length; x++) {
+				serializer[x] = getSerializer(value.getField(x));
+			}
+		}
+
+		@Override
+		public void serializeInternal(Tuple value) {
+			int length = 0;
+			for (int x = 0; x < serializer.length; x++) {
+				serializer[x].buffer.clear();
+				serializer[x].serializeInternal(value.getField(x));
+				length += serializer[x].buffer.position();
+				buffers.add(serializer[x].buffer);
+			}
+			buffer = ByteBuffer.allocate(length);
+			for (ByteBuffer b : buffers) {
+				b.flip();
+				buffer.put(b);
+			}
+			buffers.clear();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
new file mode 100644
index 0000000..1ad0606
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common.streaming;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Simple utility class to print all contents of an inputstream to stdout.
+ */
+public class StreamPrinter extends Thread {
+	private final BufferedReader reader;
+	private final boolean wrapInException;
+	private StringBuilder msg;
+
+	public StreamPrinter(InputStream stream) {
+		this(stream, false, null);
+	}
+
+	public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) {
+		this.reader = new BufferedReader(new InputStreamReader(stream));
+		this.wrapInException = wrapInException;
+		this.msg = msg;
+	}
+
+	@Override
+	public void run() {
+		String line;
+		try {
+			if (wrapInException) {
+				while ((line = reader.readLine()) != null) {
+					msg.append("\n" + line);
+				}
+			} else {
+				while ((line = reader.readLine()) != null) {
+					System.out.println(line);
+					System.out.flush();
+				}
+			}
+		} catch (IOException ex) {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
new file mode 100644
index 0000000..1a96e98
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common.streaming;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import static org.apache.flink.languagebinding.api.java.common.PlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
+import static org.apache.flink.languagebinding.api.java.common.PlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the basis for using an external process within a Java Flink operator. It contains logic to send and
+ * receive data, while taking care of synchronization.
+ */
+public abstract class Streamer implements Serializable {
+	protected static final Logger LOG = LoggerFactory.getLogger(Streamer.class);
+	private static final int SIGNAL_BUFFER_REQUEST = 0;
+	private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
+	private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
+	private static final int SIGNAL_FINISHED = -1;
+	private static final int SIGNAL_ERROR = -2;
+	private static final byte SIGNAL_LAST = 32;
+
+	private final byte[] buffer = new byte[4];
+	private DatagramPacket packet;
+	protected InetAddress host;
+
+	protected DatagramSocket socket;
+	protected int port1;
+	protected int port2;
+	protected Sender sender;
+	protected Receiver receiver;
+
+	protected StringBuilder msg = new StringBuilder();
+
+	protected final AbstractRichFunction function;
+
+	public Streamer(AbstractRichFunction function) {
+		this.function = function;
+		sender = new Sender(function);
+		receiver = new Receiver(function);
+	}
+
+	public void open() throws IOException {
+		host = InetAddress.getByName("localhost");
+		packet = new DatagramPacket(buffer, 0, 4);
+		socket = new DatagramSocket(0, host);
+		socket.setSoTimeout(10000);
+		try {
+			setupProcess();
+			setupPorts();
+		} catch (SocketTimeoutException ste) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+		}
+		socket.setSoTimeout(300000);
+	}
+
+	/**
+	 * This method opens all required resources-
+	 *
+	 * @throws IOException
+	 */
+	public abstract void setupProcess() throws IOException;
+
+	/**
+	 * This method closes all previously opened resources.
+	 *
+	 * @throws IOException
+	 */
+	public void close() throws IOException {
+		socket.close();
+		sender.close();
+		receiver.close();
+	}
+
+	/**
+	 * Setups the required UDP-ports.The streamer requires two UDP-ports to send control-signals to, one each for
+	 * reading/writing operations.
+	 *
+	 * @throws IOException
+	 */
+	private void setupPorts() throws IOException, SocketTimeoutException {
+		socket.receive(new DatagramPacket(buffer, 0, 4));
+		checkForError();
+		port1 = getInt(buffer, 0);
+		socket.receive(new DatagramPacket(buffer, 0, 4));
+		checkForError();
+		port2 = getInt(buffer, 0);
+	}
+
+	private void sendWriteNotification(int size, boolean hasNext) throws IOException {
+		byte[] tmp = new byte[5];
+		putInt(tmp, 0, size);
+		tmp[4] = hasNext ? 0 : SIGNAL_LAST;
+		socket.send(new DatagramPacket(tmp, 0, 5, host, port1));
+	}
+
+	private void sendReadConfirmation() throws IOException {
+		socket.send(new DatagramPacket(new byte[1], 0, 1, host, port2));
+	}
+
+	private void checkForError() {
+		if (getInt(buffer, 0) == -2) {
+			try { //wait before terminating to ensure that the complete error message is printed
+				Thread.sleep(2000);
+			} catch (InterruptedException ex) {
+			}
+			throw new RuntimeException(
+					"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
+		}
+	}
+
+	/**
+	 * Sends all broadcast-variables encoded in the configuration to the external process.
+	 *
+	 * @param config configuration object containing broadcast-variable count and names
+	 * @throws IOException
+	 */
+	public final void sendBroadCastVariables(Configuration config) throws IOException {
+		try {
+			int broadcastCount = config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
+
+			String[] names = new String[broadcastCount];
+
+			for (int x = 0; x < names.length; x++) {
+				names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
+			}
+
+			socket.receive(packet);
+			checkForError();
+			int size = sender.sendRecord(broadcastCount);
+			sendWriteNotification(size, false);
+
+			for (String name : names) {
+				Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
+
+				socket.receive(packet);
+				checkForError();
+				size = sender.sendRecord(name);
+				sendWriteNotification(size, false);
+
+				while (bcv.hasNext() || sender.hasRemaining(0)) {
+					socket.receive(packet);
+					checkForError();
+					size = sender.sendBuffer(bcv, 0);
+					sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0));
+				}
+				sender.reset();
+			}
+		} catch (SocketTimeoutException ste) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+		}
+	}
+
+	/**
+	 * Sends all values contained in the iterator to the external process and collects all results.
+	 *
+	 * @param i iterator
+	 * @param c collector
+	 * @throws IOException
+	 */
+	public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException {
+		try {
+			int size;
+			if (i.hasNext()) {
+				while (true) {
+					socket.receive(packet);
+					int sig = getInt(buffer, 0);
+					switch (sig) {
+						case SIGNAL_BUFFER_REQUEST:
+							if (i.hasNext() || sender.hasRemaining(0)) {
+								size = sender.sendBuffer(i, 0);
+								sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
+							}
+							break;
+						case SIGNAL_FINISHED:
+							return;
+						case SIGNAL_ERROR:
+							try { //wait before terminating to ensure that the complete error message is printed
+								Thread.sleep(2000);
+							} catch (InterruptedException ex) {
+							}
+							throw new RuntimeException(
+									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
+						default:
+							receiver.collectBuffer(c, sig);
+							sendReadConfirmation();
+							break;
+					}
+				}
+			}
+		} catch (SocketTimeoutException ste) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+		}
+	}
+
+	/**
+	 * Sends all values contained in both iterators to the external process and collects all results.
+	 *
+	 * @param i1 iterator
+	 * @param i2 iterator
+	 * @param c collector
+	 * @throws IOException
+	 */
+	public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException {
+		try {
+			int size;
+			if (i1.hasNext() || i2.hasNext()) {
+				while (true) {
+					socket.receive(packet);
+					int sig = getInt(buffer, 0);
+					switch (sig) {
+						case SIGNAL_BUFFER_REQUEST_G0:
+							if (i1.hasNext() || sender.hasRemaining(0)) {
+								size = sender.sendBuffer(i1, 0);
+								sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext());
+							}
+							break;
+						case SIGNAL_BUFFER_REQUEST_G1:
+							if (i2.hasNext() || sender.hasRemaining(1)) {
+								size = sender.sendBuffer(i2, 1);
+								sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext());
+							}
+							break;
+						case SIGNAL_FINISHED:
+							return;
+						case SIGNAL_ERROR:
+							try { //wait before terminating to ensure that the complete error message is printed
+								Thread.sleep(2000);
+							} catch (InterruptedException ex) {
+							}
+							throw new RuntimeException(
+									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
+						default:
+							receiver.collectBuffer(c, sig);
+							sendReadConfirmation();
+							break;
+					}
+				}
+			}
+		} catch (SocketTimeoutException ste) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+		}
+	}
+
+	protected final static int getInt(byte[] array, int offset) {
+		return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 | (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff);
+	}
+
+	protected final static void putInt(byte[] array, int offset, int value) {
+		array[offset] = (byte) (value >> 24);
+		array[offset + 1] = (byte) (value >> 16);
+		array[offset + 2] = (byte) (value >> 8);
+		array[offset + 3] = (byte) (value);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/pom.xml b/flink-staging/flink-language-binding/pom.xml
new file mode 100644
index 0000000..e4843cd
--- /dev/null
+++ b/flink-staging/flink-language-binding/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-staging</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-language-binding-parent</artifactId>
+	<name>flink-language-binding</name>
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-language-binding-generic</module>
+	</modules>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 7a364ab..62a2fcd 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -45,6 +45,7 @@ under the License.
 		<module>flink-hcatalog</module>
 		<module>flink-table</module>
 		<module>flink-ml</module>
+		<module>flink-language-binding</module>
 	</modules>
 	
 	<!-- See main pom.xml for explanation of profiles -->