You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/10 13:55:51 UTC

[1/3] flink git commit: [FLINK-2631] [streaming] Fixes the StreamFold operator and adds OutputTypeConfigurable interface to support type injection at StreamGraph creation.

Repository: flink
Updated Branches:
  refs/heads/master c94fdcdd5 -> 8754352ff


[FLINK-2631] [streaming] Fixes the StreamFold operator and adds OutputTypeConfigurable interface to support type injection at StreamGraph creation.

Adds test for non serializable fold type. Adds test to verify proper output type forwarding for OutputTypeConfigurable implementations.
Makes OutputTypeConfigurable typed, tests that TwoInputStreamOperator is output type configurable

This closes #1101


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c2791b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c2791b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c2791b0

Branch: refs/heads/master
Commit: 9c2791b0a1b8bd3b0fb189220749d1c82f7a0d09
Parents: c94fdcd
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 7 11:34:48 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 10 12:28:47 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/GroupedDataStream.java       |   2 +-
 .../flink/streaming/api/graph/StreamGraph.java  |  35 ++-
 .../api/graph/StreamGraphGenerator.java         |   2 +-
 .../api/operators/OutputTypeConfigurable.java   |  42 ++++
 .../streaming/api/operators/StreamFold.java     |  51 +++-
 .../api/operators/StreamGroupedFold.java        |  28 ++-
 .../api/operators/StreamGroupedReduce.java      |   8 +-
 .../streaming/api/operators/StreamReduce.java   |   3 +-
 .../streaming/api/StreamingOperatorsITCase.java | 230 +++++++++++++++++++
 .../api/graph/StreamGraphGeneratorTest.java     | 129 ++++++++++-
 .../api/operators/StreamGroupedFoldTest.java    |  10 +-
 ...ScalaStreamingMultipleProgramsTestBase.scala |  55 +++++
 .../api/scala/StreamingOperatorsITCase.scala    | 116 ++++++++++
 13 files changed, 679 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 72ef945..a1106bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -88,7 +88,7 @@ public class GroupedDataStream<OUT> extends KeyedDataStream<OUT> {
 				Utils.getCallLocationName(), true);
 
 		return transform("Grouped Fold", outType, new StreamGroupedFold<OUT, R>(clean(folder),
-				keySelector, initialValue, outType));
+				keySelector, initialValue));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 6474ae9..cda5686 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -190,8 +191,12 @@ public class StreamGraph extends StreamingPlan {
 		sinks.add(vertexID);
 	}
 
-	public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<OUT> operatorObject,
-			TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+	public <IN, OUT> void addOperator(
+			Integer vertexID,
+			StreamOperator<OUT> operatorObject,
+			TypeInformation<IN> inTypeInfo,
+			TypeInformation<OUT> outTypeInfo,
+			String operatorName) {
 
 		if (operatorObject instanceof StreamSource) {
 			addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName);
@@ -205,22 +210,40 @@ public class StreamGraph extends StreamingPlan {
 
 		setSerializers(vertexID, inSerializer, null, outSerializer);
 
+		if (operatorObject instanceof OutputTypeConfigurable) {
+			@SuppressWarnings("unchecked")
+			OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject;
+			// sets the output type which must be know at StreamGraph creation time
+			outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
+		}
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Vertex: {}", vertexID);
 		}
 	}
 
-	public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
-			TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
-			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+	public <IN1, IN2, OUT> void addCoOperator(
+			Integer vertexID,
+			TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject,
+			TypeInformation<IN1> in1TypeInfo,
+			TypeInformation<IN2> in2TypeInfo,
+			TypeInformation<OUT> outTypeInfo,
+			String operatorName) {
 
-		addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, operatorName);
+		addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName);
 
 		TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
 				outTypeInfo.createSerializer(executionConfig) : null;
 
 		setSerializers(vertexID, in1TypeInfo.createSerializer(executionConfig), in2TypeInfo.createSerializer(executionConfig), outSerializer);
 
+		if (taskOperatorObject instanceof OutputTypeConfigurable) {
+			@SuppressWarnings("unchecked")
+			OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) taskOperatorObject;
+			// sets the output type which must be know at StreamGraph creation time
+			outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
+		}
+
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("CO-TASK: {}", vertexID);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 6df8cb5..774c00b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -47,7 +47,7 @@ import java.util.Map;
  *
  * <p>
  * This traverses the tree of {@code StreamTransformations} starting from the sinks. At each
- * we transformation recursively transform the inputs, then create a node in the {@code StreamGraph}
+ * transformation we recursively transform the inputs, then create a node in the {@code StreamGraph}
  * and add edges from the input Nodes to our newly created node. The transformation methods
  * return the IDs of the nodes in the StreamGraph that represent the input transformation. Several
  * IDs can be returned to be able to deal with feedback transformations and unions.

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
new file mode 100644
index 0000000..1d05966
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Stream operators can implement this interface if they need access to the output type information
+ * at {@link org.apache.flink.streaming.api.graph.StreamGraph} generation. This can be useful for
+ * cases where the output type is specified by the returns method and, thus, after the stream
+ * operator has been created.
+ */
+public interface OutputTypeConfigurable<OUT> {
+
+	/**
+	 * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)}
+	 * method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
+	 * method is called with the output {@link TypeInformation} which is also used for the
+	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer.
+	 *
+	 * @param outTypeInfo Output type information of the {@link org.apache.flink.streaming.runtime.tasks.StreamTask}
+	 * @param executionConfig Execution configuration
+	 */
+	void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
index a5e5264..81115f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -17,27 +17,36 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 public class StreamFold<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT> {
+		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
 
 	private static final long serialVersionUID = 1L;
 
-	private OUT accumulator;
+	protected transient OUT accumulator;
+	private byte[] serializedInitialValue;
+
 	protected TypeSerializer<OUT> outTypeSerializer;
-	protected TypeInformation<OUT> outTypeInformation;
 
-	public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+	public StreamFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
 		super(folder);
 		this.accumulator = initialValue;
-		this.outTypeInformation = outTypeInformation;
 		this.chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
 	}
 
@@ -50,11 +59,41 @@ public class StreamFold<IN, OUT>
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
-		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
+
+		if (serializedInitialValue == null) {
+			throw new RuntimeException("No initial value was serialized for the fold " +
+					"operator. Probably the setOutputType method was not called.");
+		}
+
+		ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+		InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(
+			new DataInputStream(bais)
+		);
+
+		accumulator = outTypeSerializer.deserialize(in);
 	}
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
 		output.emitWatermark(mark);
 	}
+
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(
+			new DataOutputStream(baos)
+		);
+
+		try {
+			outTypeSerializer.serialize(accumulator, out);
+		} catch (IOException ioe) {
+			throw new RuntimeException("Unable to serialize initial value of type " +
+					accumulator.getClass().getSimpleName() + " of fold operator.", ioe);
+		}
+
+		serializedInitialValue = baos.toByteArray();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 5272a48..f4e44c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -21,8 +21,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
@@ -30,28 +30,34 @@ public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private KeySelector<IN, ?> keySelector;
-	private Map<Object, OUT> values;
-	private OUT initialValue;
+	private transient Map<Object, OUT> values;
 
-	public StreamGroupedFold(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
-			OUT initialValue, TypeInformation<OUT> outTypeInformation) {
-		super(folder, initialValue, outTypeInformation);
+	public StreamGroupedFold(
+			FoldFunction<IN, OUT> folder,
+			KeySelector<IN, ?> keySelector,
+			OUT initialValue) {
+		super(folder, initialValue);
 		this.keySelector = keySelector;
-		this.initialValue = initialValue;
+	}
+
+	@Override
+	public void open(Configuration configuration) throws Exception {
+		super.open(configuration);
+
 		values = new HashMap<Object, OUT>();
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		Object key = keySelector.getKey(element.getValue());
-		OUT accumulator = values.get(key);
+		OUT value = values.get(key);
 
-		if (accumulator != null) {
-			OUT folded = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
+		if (value != null) {
+			OUT folded = userFunction.fold(outTypeSerializer.copy(value), element.getValue());
 			values.put(key, folded);
 			output.collect(element.replace(folded));
 		} else {
-			OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
+			OUT first = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
 			values.put(key, first);
 			output.collect(element.replace(first));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 6be011e..7533c33 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -29,17 +29,21 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private KeySelector<IN, ?> keySelector;
-	private Map<Object, IN> values;
+	private transient Map<Object, IN> values;
 
 	public StreamGroupedReduce(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) {
 		super(reducer);
 		this.keySelector = keySelector;
-		values = new HashMap<Object, IN>();
 	}
 
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		Object key = keySelector.getKey(element.getValue());
+
+		if (values == null) {
+			values = new HashMap<Object, IN>();
+		}
+
 		IN currentValue = values.get(key);
 		if (currentValue != null) {
 			// TODO: find a way to let operators copy elements (maybe)

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
index 52c07d0..af562fe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -26,7 +26,7 @@ public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFuncti
 
 	private static final long serialVersionUID = 1L;
 
-	private IN currentValue;
+	private transient IN currentValue;
 
 	public StreamReduce(ReduceFunction<IN> reducer) {
 		super(reducer);
@@ -42,7 +42,6 @@ public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFuncti
 			currentValue = userFunction.reduce(currentValue, element.getValue());
 		} else {
 			currentValue = element.getValue();
-
 		}
 		output.collect(element.replace(currentValue));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
new file mode 100644
index 0000000..11100a4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamingOperatorsITCase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase {
+
+	private String resultPath1;
+	private String resultPath2;
+	private String expected1;
+	private String expected2;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception {
+		resultPath1 = tempFolder.newFile().toURI().toString();
+		resultPath2 = tempFolder.newFile().toURI().toString();
+		expected1 = "";
+		expected2 = "";
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected1, resultPath1);
+		compareResultsByLinesInMemory(expected2, resultPath2);
+	}
+
+	/**
+	 * Tests the proper functioning of the streaming fold operator. For this purpose, a stream
+	 * of Tuple2<Integer, Integer> is created. The stream is grouped according to the first tuple
+	 * value. Each group is folded where the second tuple value is summed up.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testFoldOperation() throws Exception {
+		int numElements = 10;
+		int numKeys = 2;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements, numKeys));
+
+		SplitDataStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
+			.groupBy(0)
+			.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
+				@Override
+				public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws Exception {
+					return accumulator + value.f1;
+				}
+			}).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() {
+				@Override
+				public Tuple2<Integer, Integer> map(Integer value) throws Exception {
+					return new Tuple2<Integer, Integer>(getRuntimeContext().getIndexOfThisSubtask(), value);
+				}
+			}).split(new OutputSelector<Tuple2<Integer, Integer>>() {
+				@Override
+				public Iterable<String> select(Tuple2<Integer, Integer> value) {
+					List<String> output = new ArrayList<>();
+
+					output.add(value.f0 + "");
+
+					return output;
+				}
+			});
+
+		splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() {
+			@Override
+			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
+				return value.f1;
+			}
+		}).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
+			@Override
+			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
+				return value.f1;
+			}
+		}).writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE);
+
+		StringBuilder builder1 = new StringBuilder();
+		StringBuilder builder2 = new StringBuilder();
+		int counter1 = 0;
+		int counter2 = 0;
+
+		for (int i = 0; i < numElements; i++) {
+			if (i % 2 == 0) {
+				counter1 += i;
+				builder1.append(counter1 + "\n");
+			} else {
+				counter2 += i;
+				builder2.append(counter2 + "\n");
+			}
+		}
+
+		expected1 = builder1.toString();
+		expected2 = builder2.toString();
+
+		env.execute();
+	}
+
+	/**
+	 * Tests whether the fold operation can also be called with non Java serializable types.
+	 */
+	@Test
+	public void testFoldOperationWithNonJavaSerializableType() throws Exception {
+		final int numElements = 10;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
+
+		input
+			.groupBy(0)
+			.fold(
+				new NonSerializable(42),
+				new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() {
+					@Override
+					public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable> value) throws Exception {
+						return new NonSerializable(accumulator.value + value.f1.value);
+					}
+			})
+			.map(new MapFunction<NonSerializable, Integer>() {
+				@Override
+				public Integer map(NonSerializable value) throws Exception {
+					return value.value;
+				}
+			})
+			.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
+
+		StringBuilder builder = new StringBuilder();
+
+		for (int i = 0; i < numElements; i++) {
+			builder.append(42 + i + "\n");
+		}
+
+		expected1 = builder.toString();
+
+		env.execute();
+	}
+
+	private static class NonSerializable {
+		// This makes the type non-serializable
+		private final Object obj = new Object();
+
+		private final int value;
+
+		public NonSerializable(int value) {
+			this.value = value;
+		}
+	}
+
+	private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> {
+		private final int numElements;
+
+		public NonSerializableTupleSource(int numElements) {
+			this.numElements = numElements;
+		}
+
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
+			for (int i = 0; i < numElements; i++) {
+				ctx.collect(new Tuple2<Integer, NonSerializable>(i, new NonSerializable(i)));
+			}
+		}
+
+		@Override
+		public void cancel() {}
+	}
+
+	private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {
+
+		private final int numElements;
+		private final int numKeys;
+
+		public TupleSource(int numElements, int numKeys) {
+			this.numElements = numElements;
+			this.numKeys = numKeys;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+			for (int i = 0; i < numElements; i++) {
+				Tuple2<Integer, Integer> result = new Tuple2<>(i % numKeys, i);
+				ctx.collect(result);
+			}
+		}
+
+		@Override
+		public void cancel() {
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index fb2ef56..3b05274 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -17,18 +17,29 @@
  */
 package org.apache.flink.streaming.api.graph;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.streaming.util.EvenOddOutputSelector;
 import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.NoOpSink;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -176,4 +187,120 @@ public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase
 
 	}
 
+	/**
+	 * Test whether an {@link OutputTypeConfigurable} implementation gets called with the correct
+	 * output type. In this test case the output type must be BasicTypeInfo.INT_TYPE_INFO.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testOutputTypeConfigurationWithOneInputTransformation() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source = env.fromElements(1, 10);
+
+		OutputTypeConfigurableOperationWithOneInput outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithOneInput();
+
+		DataStream<Integer> result = source.transform(
+			"Single input and output type configurable operation",
+			BasicTypeInfo.INT_TYPE_INFO,
+			outputTypeConfigurableOperation);
+
+		result.addSink(new NoOpSink<Integer>());
+
+		StreamGraph graph = env.getStreamGraph();
+
+		assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
+	}
+
+	@Test
+	public void testOutputTypeConfigurationWithTwoInputTransformation() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source1 = env.fromElements(1, 10);
+		DataStream<Integer> source2 = env.fromElements(2, 11);
+
+		ConnectedDataStream<Integer, Integer> connectedSource = source1.connect(source2);
+
+		OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs();
+
+		DataStream<Integer> result = connectedSource.transform(
+				"Two input and output type configurable operation",
+				BasicTypeInfo.INT_TYPE_INFO,
+				outputTypeConfigurableOperation);
+
+		result.addSink(new NoOpSink<Integer>());
+
+		StreamGraph graph = env.getStreamGraph();
+
+		assertEquals(BasicTypeInfo.INT_TYPE_INFO, outputTypeConfigurableOperation.getTypeInformation());
+	}
+
+	private static class OutputTypeConfigurableOperationWithTwoInputs
+			extends AbstractStreamOperator<Integer>
+			implements TwoInputStreamOperator<Integer, Integer, Integer>, OutputTypeConfigurable<Integer> {
+
+		TypeInformation<Integer> tpeInformation;
+
+		public TypeInformation<Integer> getTypeInformation() {
+			return tpeInformation;
+		}
+
+		@Override
+		public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
+			tpeInformation = outTypeInfo;
+		}
+
+		@Override
+		public void processElement1(StreamRecord element) throws Exception {
+			output.collect(element);
+		}
+
+		@Override
+		public void processElement2(StreamRecord element) throws Exception {
+			output.collect(element);
+		}
+
+		@Override
+		public void processWatermark1(Watermark mark) throws Exception {
+
+		}
+
+		@Override
+		public void processWatermark2(Watermark mark) throws Exception {
+
+		}
+
+		@Override
+		public void setup(Output output, StreamingRuntimeContext runtimeContext) {
+
+		}
+	}
+
+	private static class OutputTypeConfigurableOperationWithOneInput
+			extends AbstractStreamOperator<Integer>
+			implements OneInputStreamOperator<Integer, Integer>, OutputTypeConfigurable<Integer> {
+
+		TypeInformation<Integer> tpeInformation;
+
+		public TypeInformation<Integer> getTypeInformation() {
+			return tpeInformation;
+		}
+
+		@Override
+		public void processElement(StreamRecord<Integer> element) throws Exception {
+			output.collect(element);
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+
+		}
+
+		@Override
+		public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
+			tpeInformation = outTypeInfo;
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index dcfe3de..82dddfe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.RichFoldFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -69,7 +70,9 @@ public class StreamGroupedFoldTest {
 			public String getKey(Integer value) throws Exception {
 				return value.toString();
 			}
-		}, "100", outType);
+		}, "100");
+
+		operator.setOutputType(outType, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
 
@@ -104,7 +107,10 @@ public class StreamGroupedFoldTest {
 			public Integer getKey(Integer value) throws Exception {
 				return value;
 			}
-		}, "init", BasicTypeInfo.STRING_TYPE_INFO);
+		}, "init");
+
+		operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
+
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
 
 		long initialTime = 0L;

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
new file mode 100644
index 0000000..3342e1e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.streaming.util.TestStreamEnvironment
+import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.junit.JUnitSuiteLike
+
+trait ScalaStreamingMultipleProgramsTestBase
+  extends TestBaseUtils
+  with  JUnitSuiteLike
+  with BeforeAndAfterAll {
+
+  val parallelism = 4
+  var cluster: Option[ForkableFlinkMiniCluster] = None
+
+  override protected def beforeAll(): Unit = {
+    val cluster = Some(
+      TestBaseUtils.startCluster(
+        1,
+        parallelism,
+        StreamingMode.STREAMING,
+        false,
+        false,
+        true
+      )
+    )
+
+    val clusterEnvironment = new TestStreamEnvironment(cluster.get, parallelism)
+  }
+
+  override protected def afterAll(): Unit = {
+    cluster.foreach {
+      TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c2791b0/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
new file mode 100644
index 0000000..d5e2b7b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.functions.{RichMapFunction, FoldFunction}
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.rules.TemporaryFolder
+import org.junit.{After, Before, Rule, Test}
+
+class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
+
+  var resultPath1: String = _
+  var resultPath2: String = _
+  var expected1: String = _
+  var expected2: String = _
+
+  val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    val temp = tempFolder
+    resultPath1 = temp.newFile.toURI.toString
+    resultPath2 = temp.newFile.toURI.toString
+    expected1 = ""
+    expected2 = ""
+  }
+
+  @After
+  def after(): Unit = {
+    TestBaseUtils.compareResultsByLinesInMemory(expected1, resultPath1)
+    TestBaseUtils.compareResultsByLinesInMemory(expected2, resultPath2)
+  }
+
+  /** Tests the streaming fold operation. For this purpose a stream of Tuple[Int, Int] is created.
+    * The stream is grouped by the first field. For each group, the resulting stream is folded by
+    * summing up the second tuple field.
+    *
+    */
+  @Test
+  def testFoldOperator(): Unit = {
+    val numElements = 10
+    val numKeys = 2
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {
+
+      override def run(ctx: SourceContext[(Int, Int)]): Unit = {
+        0 until numElements foreach {
+          i => ctx.collect((i % numKeys, i))
+        }
+      }
+
+      override def cancel(): Unit = {}
+    })
+
+    val splittedResult = sourceStream
+      .groupBy(0)
+      .fold(0, new FoldFunction[(Int, Int), Int] {
+        override def fold(accumulator: Int, value: (Int, Int)): Int = {
+          accumulator + value._2
+        }
+      })
+      .map(new RichMapFunction[Int, (Int, Int)] {
+        override def map(value: Int): (Int, Int) = {
+          (getRuntimeContext.getIndexOfThisSubtask, value)
+        }
+      })
+      .split{
+        x =>
+          Seq(x._1.toString)
+      }
+
+    splittedResult
+      .select("0")
+      .map(_._2)
+      .getJavaStream
+      .writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE)
+    splittedResult
+      .select("1")
+      .map(_._2)
+      .getJavaStream
+      .writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)
+
+    val groupedSequence = 0 until numElements groupBy( _ % numKeys)
+
+    expected1 = groupedSequence(0).scanLeft(0)(_ + _).tail.mkString("\n")
+    expected2 = groupedSequence(1).scanLeft(0)(_ + _).tail.mkString("\n")
+
+    env.execute()
+  }
+}


[2/3] flink git commit: [FLINK-2645] [jobmanager] Fail job execution if final accumulators cannot be merged and forward exceptions.

Posted by fh...@apache.org.
[FLINK-2645] [jobmanager] Fail job execution if final accumulators cannot be merged and forward exceptions.

This closes #1112


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16fb4e91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16fb4e91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16fb4e91

Branch: refs/heads/master
Commit: 16fb4e919f4a76b8fe4910435b2183fa172f6e24
Parents: 9c2791b
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 9 13:49:40 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 10 12:35:34 2015 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  15 +-
 .../executiongraph/ExecutionJobVertex.java      |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   8 +-
 .../accumulators/AccumulatorErrorITCase.java    | 193 +++++++++++++++++++
 4 files changed, 207 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a44fc82..1d8a37c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -827,7 +827,7 @@ public class ExecutionGraph implements Serializable {
 		}
 	}
 
-	void jobVertexInFinalState(ExecutionJobVertex ev) {
+	void jobVertexInFinalState() {
 		synchronized (progressLock) {
 			if (numFinishedJobVertices >= verticesInCreationOrder.size()) {
 				throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
@@ -927,19 +927,18 @@ public class ExecutionGraph implements Serializable {
 				case RUNNING:
 					return attempt.switchToRunning();
 				case FINISHED:
-					Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = null;
-					Map<String, Accumulator<?, ?>> userAccumulators = null;
 					try {
 						AccumulatorSnapshot accumulators = state.getAccumulators();
-						flinkAccumulators = accumulators.deserializeFlinkAccumulators();
-						userAccumulators = accumulators.deserializeUserAccumulators(userClassLoader);
+						Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators =
+							accumulators.deserializeFlinkAccumulators();
+						Map<String, Accumulator<?, ?>> userAccumulators =
+							accumulators.deserializeUserAccumulators(userClassLoader);
+						attempt.markFinished(flinkAccumulators, userAccumulators);
 					}
 					catch (Exception e) {
-						// we do not fail the job on deserialization problems of accumulators, but only log
 						LOG.error("Failed to deserialize final accumulator results.", e);
+						attempt.markFailed(e);
 					}
-
-					attempt.markFinished(flinkAccumulators, userAccumulators);
 					return true;
 				case CANCELED:
 					attempt.cancelingComplete();

http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index dea619a..999ca1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -482,7 +482,7 @@ public class ExecutionJobVertex implements Serializable {
 					stateMonitor.notifyAll();
 					
 					// tell the graph
-					graph.jobVertexInFinalState(this);
+					graph.jobVertexInFinalState();
 				} else {
 					numSubtasksInFinalState++;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 18c453f..27fc6e3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -384,10 +384,14 @@ class JobManager(
               newJobStatus match {
                 case JobStatus.FINISHED =>
                   val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
-                  executionGraph.getAccumulatorsSerialized()
+                    executionGraph.getAccumulatorsSerialized()
                   } catch {
                     case e: Exception =>
-                      log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
+                      log.error(s"Cannot fetch final accumulators for job $jobID", e)
+                      val exception = new JobExecutionException(jobID,
+                        "Failed to retrieve accumulator results.", e)
+                      jobInfo.client ! decorateMessage(JobResultFailure(
+                        new SerializedThrowable(exception)))
                       Collections.emptyMap()
                   }
                 val result = new SerializedJobExecutionResult(

http://git-wip-us.apache.org/repos/asf/flink/blob/16fb4e91/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
new file mode 100644
index 0000000..cac8451
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests cases where Accumulator are
+ *  a) throw errors during runtime
+ *  b) is not compatible with existing accumulator
+ */
+public class AccumulatorErrorITCase {
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void startCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			cluster = new ForkableFlinkMiniCluster(config, false);
+
+			cluster.start();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to start test cluster: " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void shutdownCluster() {
+		try {
+			cluster.shutdown();
+			cluster = null;
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to stop test cluster: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFaultyAccumulator() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		env.getConfig().disableSysoutLogging();
+
+		// Test Exception forwarding with faulty Accumulator implementation
+		DataSet<Long> input = env.generateSequence(0, 10000);
+
+		DataSet<Long> map = input.map(new FaultyAccumulatorUsingMapper());
+
+		map.output(new DiscardingOutputFormat<Long>());
+
+		try {
+			env.execute();
+			fail("Should have failed.");
+		} catch (ProgramInvocationException e) {
+			Assert.assertTrue("Exception should be passed:",
+					e.getCause() instanceof JobExecutionException);
+			Assert.assertTrue("Root cause should be:",
+					e.getCause().getCause() instanceof CustomException);
+		}
+	}
+
+
+	@Test
+	public void testInvalidTypeAccumulator() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
+		env.getConfig().disableSysoutLogging();
+
+		// Test Exception forwarding with faulty Accumulator implementation
+		DataSet<Long> input = env.generateSequence(0, 10000);
+
+		DataSet<Long> mappers = input.map(new IncompatibleAccumulatorTypesMapper())
+				.map(new IncompatibleAccumulatorTypesMapper2());
+
+		mappers.output(new DiscardingOutputFormat<Long>());
+
+		try {
+			env.execute();
+			fail("Should have failed.");
+		} catch (ProgramInvocationException e) {
+			Assert.assertTrue("Exception should be passed:",
+					e.getCause() instanceof JobExecutionException);
+			Assert.assertTrue("Root cause should be:",
+					e.getCause().getCause() instanceof Exception);
+			Assert.assertTrue("Root cause should be:",
+					e.getCause().getCause().getCause() instanceof UnsupportedOperationException);
+		}
+	}
+
+	/* testFaultyAccumulator */
+
+	private static class FaultyAccumulatorUsingMapper extends RichMapFunction<Long, Long> {
+
+		private static final long serialVersionUID = 42;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			getRuntimeContext().addAccumulator("test", new FaultyAccumulator());
+		}
+
+		@Override
+		public Long map(Long value) throws Exception {
+			return -1L;
+		}
+	}
+
+	private static class FaultyAccumulator extends LongCounter {
+
+		private static final long serialVersionUID = 42;
+
+		@Override
+		public LongCounter clone() {
+			throw new CustomException();
+		}
+	}
+
+	private static class CustomException extends RuntimeException {
+		private static final long serialVersionUID = 42;
+	}
+
+	/* testInvalidTypeAccumulator */
+
+	private static class IncompatibleAccumulatorTypesMapper extends RichMapFunction<Long, Long> {
+
+		private static final long serialVersionUID = 42;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			getRuntimeContext().addAccumulator("test", new LongCounter());
+		}
+
+		@Override
+		public Long map(Long value) throws Exception {
+			return -1L;
+		}
+	}
+
+	private static class IncompatibleAccumulatorTypesMapper2 extends RichMapFunction<Long, Long> {
+
+		private static final long serialVersionUID = 42;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			getRuntimeContext().addAccumulator("test", new DoubleCounter());
+		}
+
+		@Override
+		public Long map(Long value) throws Exception {
+			return -1L;
+		}
+	}
+
+}


[3/3] flink git commit: [FLINK-2617] [hadoop-compat] Added static mutexes for configure, open, close HadoopFormats

Posted by fh...@apache.org.
[FLINK-2617] [hadoop-compat] Added static mutexes for configure, open, close HadoopFormats

This closes #1111


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8754352f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8754352f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8754352f

Branch: refs/heads/master
Commit: 8754352ff53cd1ab621d6c97f7e5baac369b5c28
Parents: 16fb4e9
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Sep 9 14:32:21 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 10 12:35:48 2015 +0200

----------------------------------------------------------------------
 .../hadoop/mapred/HadoopInputFormatBase.java    |  46 ++++--
 .../hadoop/mapred/HadoopOutputFormatBase.java   | 101 +++++++------
 .../hadoop/mapreduce/HadoopInputFormatBase.java |  55 ++++---
 .../mapreduce/HadoopOutputFormatBase.java       | 142 +++++++++++--------
 4 files changed, 212 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8754352f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
index 932b7de..356f7ad 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -59,6 +59,14 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
 
+	// Mutexes to avoid concurrent operations on Hadoop InputFormats.
+	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+	// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
+	// might be used in the same JVM.
+	private static final Object OPEN_MUTEX = new Object();
+	private static final Object CONFIGURE_MUTEX = new Object();
+	private static final Object CLOSE_MUTEX = new Object();
+
 	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
 	protected Class<K> keyClass;
 	protected Class<V> valueClass;
@@ -91,12 +99,15 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 	
 	@Override
 	public void configure(Configuration parameters) {
-		// configure MR InputFormat if necessary
-		if(this.mapredInputFormat instanceof Configurable) {
-			((Configurable)this.mapredInputFormat).setConf(this.jobConf);
-		}
-		else if(this.mapredInputFormat instanceof JobConfigurable) {
-			((JobConfigurable)this.mapredInputFormat).configure(this.jobConf);
+
+		// enforce sequential configuration() calls
+		synchronized (CONFIGURE_MUTEX) {
+			// configure MR InputFormat if necessary
+			if (this.mapredInputFormat instanceof Configurable) {
+				((Configurable) this.mapredInputFormat).setConf(this.jobConf);
+			} else if (this.mapredInputFormat instanceof JobConfigurable) {
+				((JobConfigurable) this.mapredInputFormat).configure(this.jobConf);
+			}
 		}
 	}
 	
@@ -148,13 +159,18 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 	
 	@Override
 	public void open(HadoopInputSplit split) throws IOException {
-		this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
-		if (this.recordReader instanceof Configurable) {
-			((Configurable) this.recordReader).setConf(jobConf);
+
+		// enforce sequential open() calls
+		synchronized (OPEN_MUTEX) {
+
+			this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+			if (this.recordReader instanceof Configurable) {
+				((Configurable) this.recordReader).setConf(jobConf);
+			}
+			key = this.recordReader.createKey();
+			value = this.recordReader.createValue();
+			this.fetched = false;
 		}
-		key = this.recordReader.createKey();
-		value = this.recordReader.createValue();
-		this.fetched = false;
 	}
 	
 	@Override
@@ -172,7 +188,11 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	@Override
 	public void close() throws IOException {
-		this.recordReader.close();
+
+		// enforce sequential close() calls
+		synchronized (CLOSE_MUTEX) {
+			this.recordReader.close();
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8754352f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index 456003f..40214f2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -54,6 +54,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 
 	private static final long serialVersionUID = 1L;
 
+	// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
+	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+	// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
+	// might be used in the same JVM.
+	private static final Object OPEN_MUTEX = new Object();
+	private static final Object CONFIGURE_MUTEX = new Object();
+	private static final Object CLOSE_MUTEX = new Object();
+
 	private JobConf jobConf;
 	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
 	protected transient RecordWriter<K,V> recordWriter;
@@ -77,12 +85,15 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 
 	@Override
 	public void configure(Configuration parameters) {
-		// configure MR OutputFormat if necessary
-		if(this.mapredOutputFormat instanceof Configurable) {
-			((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
-		}
-		else if(this.mapredOutputFormat instanceof JobConfigurable) {
-			((JobConfigurable)this.mapredOutputFormat).configure(this.jobConf);
+
+		// enforce sequential configure() calls
+		synchronized (CONFIGURE_MUTEX) {
+			// configure MR OutputFormat if necessary
+			if (this.mapredOutputFormat instanceof Configurable) {
+				((Configurable) this.mapredOutputFormat).setConf(this.jobConf);
+			} else if (this.mapredOutputFormat instanceof JobConfigurable) {
+				((JobConfigurable) this.mapredOutputFormat).configure(this.jobConf);
+			}
 		}
 	}
 
@@ -94,39 +105,43 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 	 */
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
-		if (Integer.toString(taskNumber + 1).length() > 6) {
-			throw new IOException("Task id too large.");
-		}
-
-		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
-				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
-				+ Integer.toString(taskNumber + 1)
-				+ "_0");
 
-		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
-		this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
-		// for hadoop 2.2
-		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
-
-		try {
-			this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
+		// enforce sequential open() calls
+		synchronized (OPEN_MUTEX) {
+			if (Integer.toString(taskNumber + 1).length() > 6) {
+				throw new IOException("Task id too large.");
+			}
+
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+					+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+					+ Integer.toString(taskNumber + 1)
+					+ "_0");
+
+			this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+			this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
+			// for hadoop 2.2
+			this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+			this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
+
+			try {
+				this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			this.outputCommitter = this.jobConf.getOutputCommitter();
+
+			JobContext jobContext;
+			try {
+				jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			this.outputCommitter.setupJob(jobContext);
+
+			this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
 		}
-
-		this.outputCommitter = this.jobConf.getOutputCommitter();
-
-		JobContext jobContext;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-
-		this.outputCommitter.setupJob(jobContext);
-
-		this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
 	}
 
 	/**
@@ -135,10 +150,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 	 */
 	@Override
 	public void close() throws IOException {
-		this.recordWriter.close(new HadoopDummyReporter());
-		
-		if (this.outputCommitter.needsTaskCommit(this.context)) {
-			this.outputCommitter.commitTask(this.context);
+
+		// enforce sequential close() calls
+		synchronized (CLOSE_MUTEX) {
+			this.recordWriter.close(new HadoopDummyReporter());
+
+			if (this.outputCommitter.needsTaskCommit(this.context)) {
+				this.outputCommitter.commitTask(this.context);
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/8754352f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 09435e2..e9b23f7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -59,6 +59,14 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
 
+	// Mutexes to avoid concurrent operations on Hadoop InputFormats.
+	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+	// In contrast, Flink parallelizes using Threads, so multiple Hadoop InputFormat instances
+	// might be used in the same JVM.
+	private static final Object OPEN_MUTEX = new Object();
+	private static final Object CONFIGURE_MUTEX = new Object();
+	private static final Object CLOSE_MUTEX = new Object();
+
 	// NOTE: this class is using a custom serialization logic, without a defaultWriteObject() method.
 	// Hence, all fields here are "transient".
 	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
@@ -89,8 +97,12 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	@Override
 	public void configure(Configuration parameters) {
-		if (mapreduceInputFormat instanceof Configurable) {
-			((Configurable) mapreduceInputFormat).setConf(configuration);
+
+		// enforce sequential configuration() calls
+		synchronized (CONFIGURE_MUTEX) {
+			if (mapreduceInputFormat instanceof Configurable) {
+				((Configurable) mapreduceInputFormat).setConf(configuration);
+			}
 		}
 	}
 
@@ -169,21 +181,26 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	@Override
 	public void open(HadoopInputSplit split) throws IOException {
-		TaskAttemptContext context;
-		try {
-			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-		} catch(Exception e) {
-			throw new RuntimeException(e);
-		}
 
-		try {
-			this.recordReader = this.mapreduceInputFormat
-					.createRecordReader(split.getHadoopInputSplit(), context);
-			this.recordReader.initialize(split.getHadoopInputSplit(), context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordReader.", e);
-		} finally {
-			this.fetched = false;
+		// enforce sequential open() calls
+		synchronized (OPEN_MUTEX) {
+
+			TaskAttemptContext context;
+			try {
+				context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			try {
+				this.recordReader = this.mapreduceInputFormat
+						.createRecordReader(split.getHadoopInputSplit(), context);
+				this.recordReader.initialize(split.getHadoopInputSplit(), context);
+			} catch (InterruptedException e) {
+				throw new IOException("Could not create RecordReader.", e);
+			} finally {
+				this.fetched = false;
+			}
 		}
 	}
 
@@ -207,7 +224,11 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo
 
 	@Override
 	public void close() throws IOException {
-		this.recordReader.close();
+
+		// enforce sequential close() calls
+		synchronized (CLOSE_MUTEX) {
+			this.recordReader.close();
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8754352f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 72c105b..dc475e8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -49,6 +49,14 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 
 	private static final long serialVersionUID = 1L;
 
+	// Mutexes to avoid concurrent operations on Hadoop OutputFormats.
+	// Hadoop parallelizes tasks across JVMs which is why they might rely on this JVM isolation.
+	// In contrast, Flink parallelizes using Threads, so multiple Hadoop OutputFormat instances
+	// might be used in the same JVM.
+	private static final Object OPEN_MUTEX = new Object();
+	private static final Object CONFIGURE_MUTEX = new Object();
+	private static final Object CLOSE_MUTEX = new Object();
+
 	private org.apache.hadoop.conf.Configuration configuration;
 	private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
 	protected transient RecordWriter<K,V> recordWriter;
@@ -73,8 +81,12 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 
 	@Override
 	public void configure(Configuration parameters) {
-		if(this.mapreduceOutputFormat instanceof Configurable){
-			((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);
+
+		// enforce sequential configure() calls
+		synchronized (CONFIGURE_MUTEX) {
+			if (this.mapreduceOutputFormat instanceof Configurable) {
+				((Configurable) this.mapreduceOutputFormat).setConf(this.configuration);
+			}
 		}
 	}
 
@@ -86,49 +98,53 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 	 */
 	@Override
 	public void open(int taskNumber, int numTasks) throws IOException {
-		if (Integer.toString(taskNumber + 1).length() > 6) {
-			throw new IOException("Task id too large.");
-		}
-
-		this.taskNumber = taskNumber+1;
 
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.output.basename", "tmp");
+		// enforce sequential open() calls
+		synchronized (OPEN_MUTEX) {
+			if (Integer.toString(taskNumber + 1).length() > 6) {
+				throw new IOException("Task id too large.");
+			}
 
-		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
-				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
-				+ Integer.toString(taskNumber + 1)
-				+ "_0");
+			this.taskNumber = taskNumber + 1;
 
-		this.configuration.set("mapred.task.id", taskAttemptID.toString());
-		this.configuration.setInt("mapred.task.partition", taskNumber + 1);
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
-
-		try {
-			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
-			this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
-			this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
+			// for hadoop 2.2
+			this.configuration.set("mapreduce.output.basename", "tmp");
 
-		this.context.getCredentials().addAll(this.credentials);
-		Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-		if(currentUserCreds != null) {
-			this.context.getCredentials().addAll(currentUserCreds);
-		}
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+					+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+					+ Integer.toString(taskNumber + 1)
+					+ "_0");
 
-		// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
-		if(outputCommitter instanceof FileOutputCommitter) {
-			this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter)this.outputCommitter).getWorkPath().toString());
-		}
-
-		try {
-			this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordWriter.", e);
+			this.configuration.set("mapred.task.id", taskAttemptID.toString());
+			this.configuration.setInt("mapred.task.partition", taskNumber + 1);
+			// for hadoop 2.2
+			this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+			this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
+
+			try {
+				this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+				this.outputCommitter = this.mapreduceOutputFormat.getOutputCommitter(this.context);
+				this.outputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+
+			this.context.getCredentials().addAll(this.credentials);
+			Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
+			if (currentUserCreds != null) {
+				this.context.getCredentials().addAll(currentUserCreds);
+			}
+
+			// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+			if (outputCommitter instanceof FileOutputCommitter) {
+				this.configuration.set("mapreduce.task.output.dir", ((FileOutputCommitter) this.outputCommitter).getWorkPath().toString());
+			}
+
+			try {
+				this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
+			} catch (InterruptedException e) {
+				throw new IOException("Could not create RecordWriter.", e);
+			}
 		}
 	}
 
@@ -138,27 +154,31 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormat
 	 */
 	@Override
 	public void close() throws IOException {
-		try {
-			this.recordWriter.close(this.context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not close RecordReader.", e);
-		}
-		
-		if (this.outputCommitter.needsTaskCommit(this.context)) {
-			this.outputCommitter.commitTask(this.context);
-		}
-		
-		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-		
-		// rename tmp-file to final name
-		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
-		
-		String taskNumberStr = Integer.toString(this.taskNumber);
-		String tmpFileTemplate = "tmp-r-00000";
-		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
-		
-		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
-			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+
+		// enforce sequential close() calls
+		synchronized (CLOSE_MUTEX) {
+			try {
+				this.recordWriter.close(this.context);
+			} catch (InterruptedException e) {
+				throw new IOException("Could not close RecordReader.", e);
+			}
+
+			if (this.outputCommitter.needsTaskCommit(this.context)) {
+				this.outputCommitter.commitTask(this.context);
+			}
+
+			Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+
+			// rename tmp-file to final name
+			FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
+
+			String taskNumberStr = Integer.toString(this.taskNumber);
+			String tmpFileTemplate = "tmp-r-00000";
+			String tmpFile = tmpFileTemplate.substring(0, 11 - taskNumberStr.length()) + taskNumberStr;
+
+			if (fs.exists(new Path(outputPath.toString() + "/" + tmpFile))) {
+				fs.rename(new Path(outputPath.toString() + "/" + tmpFile), new Path(outputPath.toString() + "/" + taskNumberStr));
+			}
 		}
 	}