You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/21 15:47:44 UTC
[08/10] flink git commit: [FLINK-377] Generic Interface
http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
new file mode 100644
index 0000000..d45b019
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common.streaming;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
+import static org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
+
+/**
+ * General-purpose class to write data to memory-mapped files.
+ */
+public class Sender implements Serializable {
+ public static final byte TYPE_TUPLE = (byte) 11;
+ public static final byte TYPE_BOOLEAN = (byte) 10;
+ public static final byte TYPE_BYTE = (byte) 9;
+ public static final byte TYPE_SHORT = (byte) 8;
+ public static final byte TYPE_INTEGER = (byte) 7;
+ public static final byte TYPE_LONG = (byte) 6;
+ public static final byte TYPE_DOUBLE = (byte) 4;
+ public static final byte TYPE_FLOAT = (byte) 5;
+ public static final byte TYPE_CHAR = (byte) 3;
+ public static final byte TYPE_STRING = (byte) 2;
+ public static final byte TYPE_BYTES = (byte) 1;
+ public static final byte TYPE_NULL = (byte) 0;
+
+ private final AbstractRichFunction function;
+
+ private File outputFile;
+ private RandomAccessFile outputRAF;
+ private FileChannel outputChannel;
+ private MappedByteBuffer fileBuffer;
+
+ private final ByteBuffer[] saved = new ByteBuffer[2];
+
+ private final Serializer[] serializer = new Serializer[2];
+
+ public Sender(AbstractRichFunction function) {
+ this.function = function;
+ }
+
+ //=====Setup========================================================================================================
+ public void open(String path) throws IOException {
+ setupMappedFile(path);
+ }
+
+ private void setupMappedFile(String path) throws FileNotFoundException, IOException {
+ String outputFilePath = function == null
+ ? FLINK_TMP_DATA_DIR + "/" + "input"
+ : path;
+
+ File x = new File(FLINK_TMP_DATA_DIR);
+ x.mkdirs();
+
+ outputFile = new File(outputFilePath);
+ if (outputFile.exists()) {
+ outputFile.delete();
+ }
+ outputFile.createNewFile();
+ outputRAF = new RandomAccessFile(outputFilePath, "rw");
+ outputRAF.setLength(MAPPED_FILE_SIZE);
+ outputRAF.seek(MAPPED_FILE_SIZE - 1);
+ outputRAF.writeByte(0);
+ outputRAF.seek(0);
+ outputChannel = outputRAF.getChannel();
+ fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE);
+ }
+
+ public void close() throws IOException {
+ closeMappedFile();
+ }
+
+ private void closeMappedFile() throws IOException {
+ outputChannel.close();
+ outputRAF.close();
+ }
+
+ /**
+ * Resets this object to the post-configuration state.
+ */
+ public void reset() {
+ serializer[0] = null;
+ serializer[1] = null;
+ fileBuffer.clear();
+ }
+
+ //=====Serialization================================================================================================
+ /**
+ * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user
+ * must guarantee that the file may be written to before calling this method. This method essentially reserves the
+ * whole buffer for one record. As such it imposes some performance restrictions and should only be used when
+ * absolutely necessary.
+ *
+ * @param value record to send
+ * @return size of the written buffer
+ * @throws IOException
+ */
+ public int sendRecord(Object value) throws IOException {
+ fileBuffer.clear();
+ int group = 0;
+
+ serializer[group] = getSerializer(value);
+ ByteBuffer bb = serializer[group].serialize(value);
+ if (bb.remaining() > MAPPED_FILE_SIZE) {
+ throw new RuntimeException("Serialized object does not fit into a single buffer.");
+ }
+ fileBuffer.put(bb);
+
+ int size = fileBuffer.position();
+
+ reset();
+ return size;
+ }
+
+ public boolean hasRemaining(int group) {
+ return saved[group] != null;
+ }
+
+ /**
+ * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values
+ * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must
+ * guarantee that the file may be written to before calling this method.
+ *
+ * @param i iterator containing records
+ * @param group group to which the iterator belongs, most notably used by CoGroup-functions.
+ * @return size of the written buffer
+ * @throws IOException
+ */
+ public int sendBuffer(Iterator i, int group) throws IOException {
+ fileBuffer.clear();
+
+ Object value;
+ ByteBuffer bb;
+ if (serializer[group] == null) {
+ value = i.next();
+ serializer[group] = getSerializer(value);
+ bb = serializer[group].serialize(value);
+ if (bb.remaining() > MAPPED_FILE_SIZE) {
+ throw new RuntimeException("Serialized object does not fit into a single buffer.");
+ }
+ fileBuffer.put(bb);
+
+ }
+ if (saved[group] != null) {
+ fileBuffer.put(saved[group]);
+ saved[group] = null;
+ }
+ while (i.hasNext() && saved[group] == null) {
+ value = i.next();
+ bb = serializer[group].serialize(value);
+ if (bb.remaining() > MAPPED_FILE_SIZE) {
+ throw new RuntimeException("Serialized object does not fit into a single buffer.");
+ }
+ if (bb.remaining() <= fileBuffer.remaining()) {
+ fileBuffer.put(bb);
+ } else {
+ saved[group] = bb;
+ }
+ }
+
+ int size = fileBuffer.position();
+ return size;
+ }
+
+ private enum SupportedTypes {
+ TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL
+ }
+
+ //=====Serializer===================================================================================================
+ private Serializer getSerializer(Object value) throws IOException {
+ String className = value.getClass().getSimpleName().toUpperCase();
+ if (className.startsWith("TUPLE")) {
+ className = "TUPLE";
+ }
+ if (className.startsWith("BYTE[]")) {
+ className = "BYTES";
+ }
+ SupportedTypes type = SupportedTypes.valueOf(className);
+ switch (type) {
+ case TUPLE:
+ fileBuffer.put(TYPE_TUPLE);
+ fileBuffer.putInt(((Tuple) value).getArity());
+ return new TupleSerializer((Tuple) value);
+ case BOOLEAN:
+ fileBuffer.put(TYPE_BOOLEAN);
+ return new BooleanSerializer();
+ case BYTE:
+ fileBuffer.put(TYPE_BYTE);
+ return new ByteSerializer();
+ case BYTES:
+ fileBuffer.put(TYPE_BYTES);
+ return new BytesSerializer();
+ case CHARACTER:
+ fileBuffer.put(TYPE_CHAR);
+ return new CharSerializer();
+ case SHORT:
+ fileBuffer.put(TYPE_SHORT);
+ return new ShortSerializer();
+ case INTEGER:
+ fileBuffer.put(TYPE_INTEGER);
+ return new IntSerializer();
+ case LONG:
+ fileBuffer.put(TYPE_LONG);
+ return new LongSerializer();
+ case STRING:
+ fileBuffer.put(TYPE_STRING);
+ return new StringSerializer();
+ case FLOAT:
+ fileBuffer.put(TYPE_FLOAT);
+ return new FloatSerializer();
+ case DOUBLE:
+ fileBuffer.put(TYPE_DOUBLE);
+ return new DoubleSerializer();
+ case NULL:
+ fileBuffer.put(TYPE_NULL);
+ return new NullSerializer();
+ default:
+ throw new IllegalArgumentException("Unknown Type encountered: " + type);
+ }
+ }
+
+ private abstract class Serializer<T> {
+ protected ByteBuffer buffer;
+
+ public Serializer(int capacity) {
+ buffer = ByteBuffer.allocate(capacity);
+ }
+
+ public ByteBuffer serialize(T value) {
+ buffer.clear();
+ serializeInternal(value);
+ buffer.flip();
+ return buffer;
+ }
+
+ public abstract void serializeInternal(T value);
+ }
+
+ private class ByteSerializer extends Serializer<Byte> {
+ public ByteSerializer() {
+ super(1);
+ }
+
+ @Override
+ public void serializeInternal(Byte value) {
+ buffer.put(value);
+ }
+ }
+
+ private class BooleanSerializer extends Serializer<Boolean> {
+ public BooleanSerializer() {
+ super(1);
+ }
+
+ @Override
+ public void serializeInternal(Boolean value) {
+ buffer.put(value ? (byte) 1 : (byte) 0);
+ }
+ }
+
+ private class CharSerializer extends Serializer<Character> {
+ public CharSerializer() {
+ super(4);
+ }
+
+ @Override
+ public void serializeInternal(Character value) {
+ buffer.put((value + "").getBytes());
+ }
+ }
+
+ private class ShortSerializer extends Serializer<Short> {
+ public ShortSerializer() {
+ super(2);
+ }
+
+ @Override
+ public void serializeInternal(Short value) {
+ buffer.putShort(value);
+ }
+ }
+
+ private class IntSerializer extends Serializer<Integer> {
+ public IntSerializer() {
+ super(4);
+ }
+
+ @Override
+ public void serializeInternal(Integer value) {
+ buffer.putInt(value);
+ }
+ }
+
+ private class LongSerializer extends Serializer<Long> {
+ public LongSerializer() {
+ super(8);
+ }
+
+ @Override
+ public void serializeInternal(Long value) {
+ buffer.putLong(value);
+ }
+ }
+
+ private class StringSerializer extends Serializer<String> {
+ public StringSerializer() {
+ super(0);
+ }
+
+ @Override
+ public void serializeInternal(String value) {
+ byte[] bytes = value.getBytes();
+ buffer = ByteBuffer.allocate(bytes.length + 4);
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ }
+ }
+
+ private class FloatSerializer extends Serializer<Float> {
+ public FloatSerializer() {
+ super(4);
+ }
+
+ @Override
+ public void serializeInternal(Float value) {
+ buffer.putFloat(value);
+ }
+ }
+
+ private class DoubleSerializer extends Serializer<Double> {
+ public DoubleSerializer() {
+ super(8);
+ }
+
+ @Override
+ public void serializeInternal(Double value) {
+ buffer.putDouble(value);
+ }
+ }
+
+ private class NullSerializer extends Serializer<Object> {
+ public NullSerializer() {
+ super(0);
+ }
+
+ @Override
+ public void serializeInternal(Object value) {
+ }
+ }
+
+ private class BytesSerializer extends Serializer<byte[]> {
+ public BytesSerializer() {
+ super(0);
+ }
+
+ @Override
+ public void serializeInternal(byte[] value) {
+ buffer = ByteBuffer.allocate(4 + value.length);
+ buffer.putInt(value.length);
+ buffer.put(value);
+ }
+ }
+
+ private class TupleSerializer extends Serializer<Tuple> {
+ private final Serializer[] serializer;
+ private final List<ByteBuffer> buffers;
+
+ public TupleSerializer(Tuple value) throws IOException {
+ super(0);
+ serializer = new Serializer[value.getArity()];
+ buffers = new ArrayList();
+ for (int x = 0; x < serializer.length; x++) {
+ serializer[x] = getSerializer(value.getField(x));
+ }
+ }
+
+ @Override
+ public void serializeInternal(Tuple value) {
+ int length = 0;
+ for (int x = 0; x < serializer.length; x++) {
+ serializer[x].buffer.clear();
+ serializer[x].serializeInternal(value.getField(x));
+ length += serializer[x].buffer.position();
+ buffers.add(serializer[x].buffer);
+ }
+ buffer = ByteBuffer.allocate(length);
+ for (ByteBuffer b : buffers) {
+ b.flip();
+ buffer.put(b);
+ }
+ buffers.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
new file mode 100644
index 0000000..1ad0606
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common.streaming;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Simple utility class to print all contents of an inputstream to stdout.
+ */
+public class StreamPrinter extends Thread {
+ private final BufferedReader reader;
+ private final boolean wrapInException;
+ private StringBuilder msg;
+
+ public StreamPrinter(InputStream stream) {
+ this(stream, false, null);
+ }
+
+ public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) {
+ this.reader = new BufferedReader(new InputStreamReader(stream));
+ this.wrapInException = wrapInException;
+ this.msg = msg;
+ }
+
+ @Override
+ public void run() {
+ String line;
+ try {
+ if (wrapInException) {
+ while ((line = reader.readLine()) != null) {
+ msg.append("\n" + line);
+ }
+ } else {
+ while ((line = reader.readLine()) != null) {
+ System.out.println(line);
+ System.out.flush();
+ }
+ }
+ } catch (IOException ex) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
new file mode 100644
index 0000000..1a96e98
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.common.streaming;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import static org.apache.flink.languagebinding.api.java.common.PlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
+import static org.apache.flink.languagebinding.api.java.common.PlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the basis for using an external process within a Java Flink operator. It contains logic to send and
+ * receive data, while taking care of synchronization.
+ */
+public abstract class Streamer implements Serializable {
+ protected static final Logger LOG = LoggerFactory.getLogger(Streamer.class);
+ private static final int SIGNAL_BUFFER_REQUEST = 0;
+ private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
+ private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
+ private static final int SIGNAL_FINISHED = -1;
+ private static final int SIGNAL_ERROR = -2;
+ private static final byte SIGNAL_LAST = 32;
+
+ private final byte[] buffer = new byte[4];
+ private DatagramPacket packet;
+ protected InetAddress host;
+
+ protected DatagramSocket socket;
+ protected int port1;
+ protected int port2;
+ protected Sender sender;
+ protected Receiver receiver;
+
+ protected StringBuilder msg = new StringBuilder();
+
+ protected final AbstractRichFunction function;
+
+ public Streamer(AbstractRichFunction function) {
+ this.function = function;
+ sender = new Sender(function);
+ receiver = new Receiver(function);
+ }
+
+ public void open() throws IOException {
+ host = InetAddress.getByName("localhost");
+ packet = new DatagramPacket(buffer, 0, 4);
+ socket = new DatagramSocket(0, host);
+ socket.setSoTimeout(10000);
+ try {
+ setupProcess();
+ setupPorts();
+ } catch (SocketTimeoutException ste) {
+ throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+ }
+ socket.setSoTimeout(300000);
+ }
+
+ /**
+ * This method opens all required resources-
+ *
+ * @throws IOException
+ */
+ public abstract void setupProcess() throws IOException;
+
+ /**
+ * This method closes all previously opened resources.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ socket.close();
+ sender.close();
+ receiver.close();
+ }
+
+ /**
+ * Setups the required UDP-ports.The streamer requires two UDP-ports to send control-signals to, one each for
+ * reading/writing operations.
+ *
+ * @throws IOException
+ */
+ private void setupPorts() throws IOException, SocketTimeoutException {
+ socket.receive(new DatagramPacket(buffer, 0, 4));
+ checkForError();
+ port1 = getInt(buffer, 0);
+ socket.receive(new DatagramPacket(buffer, 0, 4));
+ checkForError();
+ port2 = getInt(buffer, 0);
+ }
+
+ private void sendWriteNotification(int size, boolean hasNext) throws IOException {
+ byte[] tmp = new byte[5];
+ putInt(tmp, 0, size);
+ tmp[4] = hasNext ? 0 : SIGNAL_LAST;
+ socket.send(new DatagramPacket(tmp, 0, 5, host, port1));
+ }
+
+ private void sendReadConfirmation() throws IOException {
+ socket.send(new DatagramPacket(new byte[1], 0, 1, host, port2));
+ }
+
+ private void checkForError() {
+ if (getInt(buffer, 0) == -2) {
+ try { //wait before terminating to ensure that the complete error message is printed
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ }
+ throw new RuntimeException(
+ "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
+ }
+ }
+
+ /**
+ * Sends all broadcast-variables encoded in the configuration to the external process.
+ *
+ * @param config configuration object containing broadcast-variable count and names
+ * @throws IOException
+ */
+ public final void sendBroadCastVariables(Configuration config) throws IOException {
+ try {
+ int broadcastCount = config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
+
+ String[] names = new String[broadcastCount];
+
+ for (int x = 0; x < names.length; x++) {
+ names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
+ }
+
+ socket.receive(packet);
+ checkForError();
+ int size = sender.sendRecord(broadcastCount);
+ sendWriteNotification(size, false);
+
+ for (String name : names) {
+ Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator();
+
+ socket.receive(packet);
+ checkForError();
+ size = sender.sendRecord(name);
+ sendWriteNotification(size, false);
+
+ while (bcv.hasNext() || sender.hasRemaining(0)) {
+ socket.receive(packet);
+ checkForError();
+ size = sender.sendBuffer(bcv, 0);
+ sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0));
+ }
+ sender.reset();
+ }
+ } catch (SocketTimeoutException ste) {
+ throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+ }
+ }
+
+ /**
+ * Sends all values contained in the iterator to the external process and collects all results.
+ *
+ * @param i iterator
+ * @param c collector
+ * @throws IOException
+ */
+ public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException {
+ try {
+ int size;
+ if (i.hasNext()) {
+ while (true) {
+ socket.receive(packet);
+ int sig = getInt(buffer, 0);
+ switch (sig) {
+ case SIGNAL_BUFFER_REQUEST:
+ if (i.hasNext() || sender.hasRemaining(0)) {
+ size = sender.sendBuffer(i, 0);
+ sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
+ }
+ break;
+ case SIGNAL_FINISHED:
+ return;
+ case SIGNAL_ERROR:
+ try { //wait before terminating to ensure that the complete error message is printed
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ }
+ throw new RuntimeException(
+ "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
+ default:
+ receiver.collectBuffer(c, sig);
+ sendReadConfirmation();
+ break;
+ }
+ }
+ }
+ } catch (SocketTimeoutException ste) {
+ throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+ }
+ }
+
+ /**
+ * Sends all values contained in both iterators to the external process and collects all results.
+ *
+ * @param i1 iterator
+ * @param i2 iterator
+ * @param c collector
+ * @throws IOException
+ */
+ public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException {
+ try {
+ int size;
+ if (i1.hasNext() || i2.hasNext()) {
+ while (true) {
+ socket.receive(packet);
+ int sig = getInt(buffer, 0);
+ switch (sig) {
+ case SIGNAL_BUFFER_REQUEST_G0:
+ if (i1.hasNext() || sender.hasRemaining(0)) {
+ size = sender.sendBuffer(i1, 0);
+ sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext());
+ }
+ break;
+ case SIGNAL_BUFFER_REQUEST_G1:
+ if (i2.hasNext() || sender.hasRemaining(1)) {
+ size = sender.sendBuffer(i2, 1);
+ sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext());
+ }
+ break;
+ case SIGNAL_FINISHED:
+ return;
+ case SIGNAL_ERROR:
+ try { //wait before terminating to ensure that the complete error message is printed
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ }
+ throw new RuntimeException(
+ "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg);
+ default:
+ receiver.collectBuffer(c, sig);
+ sendReadConfirmation();
+ break;
+ }
+ }
+ }
+ } catch (SocketTimeoutException ste) {
+ throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+ }
+ }
+
+ protected final static int getInt(byte[] array, int offset) {
+ return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 | (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff);
+ }
+
+ protected final static void putInt(byte[] array, int offset, int value) {
+ array[offset] = (byte) (value >> 24);
+ array[offset + 1] = (byte) (value >> 16);
+ array[offset + 2] = (byte) (value >> 8);
+ array[offset + 3] = (byte) (value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/flink-language-binding/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/pom.xml b/flink-staging/flink-language-binding/pom.xml
new file mode 100644
index 0000000..e4843cd
--- /dev/null
+++ b/flink-staging/flink-language-binding/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-staging</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-language-binding-parent</artifactId>
+ <name>flink-language-binding</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-language-binding-generic</module>
+ </modules>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/af9248c3/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index 7a364ab..62a2fcd 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -45,6 +45,7 @@ under the License.
<module>flink-hcatalog</module>
<module>flink-table</module>
<module>flink-ml</module>
+ <module>flink-language-binding</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->