You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/12/08 10:30:14 UTC

[1/2] incubator-flink git commit: [FLINK-1139] Added FinalizeOnMaster hook to run code after the last task of an OutputFormat completed

Repository: incubator-flink
Updated Branches:
  refs/heads/master 770ce2315 -> 15f58bb23


[FLINK-1139] Added FinalizeOnMaster hook to run code after the last task of an OutputFormat completed


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/63eeafc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/63eeafc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/63eeafc8

Branch: refs/heads/master
Commit: 63eeafc87630d27589106a4d6d1131f8993cfb66
Parents: 770ce23
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Oct 6 16:27:25 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Dec 8 10:24:14 2014 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/FinalizeOnMaster.java   |  36 ++++++
 .../runtime/executiongraph/ExecutionGraph.java  |  17 ++-
 .../executiongraph/ExecutionJobVertex.java      |   9 ++
 .../runtime/jobgraph/AbstractJobVertex.java     |   9 ++
 .../runtime/jobgraph/OutputFormatVertex.java    |  35 ++++++
 .../flink/runtime/jobmanager/JobManager.java    |  16 +--
 .../ExecutionGraphDeploymentTest.java           | 115 ++++++++++++++++---
 7 files changed, 212 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java
new file mode 100644
index 0000000..6fa535c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import java.io.IOException;
+
+/**
+ * This interface may be implemented by {@link OutputFormat}s to have the master finalize them globally.
+ * 
+ */
+public interface FinalizeOnMaster {
+
+	/**
+	 * The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished.
+	 * 
+	 * @param parallelism The degree of parallelism with which the format or functions was run.
+	 * @throws IOException The finalization may throw exceptions, which may cause the job to abort.
+	 */
+	void finalizeGlobal(int parallelism) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 74e48c8..29f157c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -72,6 +72,9 @@ public class ExecutionGraph {
 	/** The job configuration that was originally attached to the JobGraph. */
 	private final Configuration jobConfiguration;
 	
+	/** The classloader of the user code. */
+	private final ClassLoader userClassLoader;
+	
 	/** All job vertices that are part of this graph */
 	private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
 	
@@ -124,14 +127,20 @@ public class ExecutionGraph {
 	}
 	
 	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig,
-						List<BlobKey> requiredJarFiles,ExecutorService executor) {
-		if (jobId == null || jobName == null || jobConfig == null) {
+						List<BlobKey> requiredJarFiles, ExecutorService executor) {
+		this(jobId, jobName, jobConfig, requiredJarFiles, Thread.currentThread().getContextClassLoader(), null);
+	}
+	
+	public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, 
+			List<BlobKey> requiredJarFiles, ClassLoader userClassLoader, ExecutorService executor) {
+		if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
 			throw new NullPointerException();
 		}
 		
 		this.jobID = jobId;
 		this.jobName = jobName;
 		this.jobConfiguration = jobConfig;
+		this.userClassLoader = userClassLoader;
 		this.executor = executor;
 		
 		this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>();
@@ -226,6 +235,10 @@ public class ExecutionGraph {
 		return jobConfiguration;
 	}
 	
+	public ClassLoader getUserClassLoader() {
+		return this.userClassLoader;
+	}
+	
 	public JobStatus getState() {
 		return state;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 73534f5..9f8b56a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -326,6 +326,15 @@ public class ExecutionJobVertex {
 				numSubtasksInFinalState++;
 				
 				if (numSubtasksInFinalState == parallelism) {
+					
+					// call finalizeOnMaster hook
+					try {
+						getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
+					}
+					catch (Throwable t) {
+						getGraph().fail(t);
+					}
+					
 					// we are in our final state
 					stateMonitor.notifyAll();
 					

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 0ce07ed..b9e1eac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -374,6 +374,15 @@ public class AbstractJobVertex implements java.io.Serializable {
 	 */
 	public void initializeOnMaster(ClassLoader loader) throws Exception {}
 	
+	/**
+	 * A hook that can be overwritten by sub classes to implement logic that is called by the 
+	 * master after the job completed.
+	 * 
+	 * @param loader The class loader for user defined code.
+	 * @throws Exception The method may throw exceptions which cause the job to fail immediately.
+	 */
+	public void finalizeOnMaster(ClassLoader loader) throws Exception {}
+	
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
index 708b390..2a1f89c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -83,4 +84,38 @@ public class OutputFormatVertex extends AbstractJobVertex {
 			((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
 		}
 	}
+	
+	@Override
+	public void finalizeOnMaster(ClassLoader loader) throws Exception {
+		final TaskConfig cfg = new TaskConfig(getConfiguration());
+
+		UserCodeWrapper<OutputFormat<?>> wrapper;
+		try {
+			wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Deserializing the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+		}
+		if (wrapper == null) {
+			throw new Exception("No input format present in InputFormatVertex's task configuration.");
+		}
+
+		OutputFormat<?> outputFormat;
+		try {
+			outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
+		}
+		catch (Throwable t) {
+			throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+		}
+		try {
+			outputFormat.configure(cfg.getStubParameters());
+		}
+		catch (Throwable t) {
+			throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+		}
+		
+		if (outputFormat instanceof FinalizeOnMaster) {
+			((FinalizeOnMaster) outputFormat).finalizeGlobal(getParallelism());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 95287d1..0cd08f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -330,15 +330,21 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			// Register this job with the library cache manager
 			libraryCacheManager.registerJob(job.getJobID(), job.getUserJarBlobKeys());
 			
+			// grab the class loader for user-defined code
+			final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(job.getJobID());
+			if (userCodeLoader == null) {
+				throw new JobException("The user code class loader could not be initialized.");
+			}
+			
 			// get the existing execution graph (if we attach), or construct a new empty one to attach
 			executionGraph = this.currentJobs.get(job.getJobID());
 			if (executionGraph == null) {
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')');
 				}
-				
+
 				executionGraph = new ExecutionGraph(job.getJobID(), job.getName(),
-						job.getJobConfiguration(), job.getUserJarBlobKeys(), this.executorService);
+						job.getJobConfiguration(), job.getUserJarBlobKeys(), userCodeLoader, this.executorService);
 
 				executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
 						job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
@@ -358,12 +364,6 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			// Register for updates on the job status
 			executionGraph.registerJobStatusListener(this);
 			
-			// grab the class loader for user-defined code
-			final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(job.getJobID());
-			if (userCodeLoader == null) {
-				throw new JobException("The user code class loader could not be initialized.");
-			}
-
 			// first, perform the master initialization of the nodes
 			if (LOG.isDebugEnabled()) {
 				LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName()));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 4cddcbd..14ce15a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -22,12 +22,15 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
@@ -45,9 +48,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.protocols.TaskOperationProtocol;
 import org.apache.flink.runtime.taskmanager.TaskOperationResult;
-
 import org.junit.Test;
-
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -150,7 +151,14 @@ public class ExecutionGraphDeploymentTest {
 	@Test
 	public void testRegistrationOfExecutionsFinishing() {
 		try {
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(7650, 2350);
+			
+			final JobVertexID jid1 = new JobVertexID();
+			final JobVertexID jid2 = new JobVertexID();
+			
+			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7650, v2, 2350);
 			
 			for (Execution e : executions.values()) {
 				e.markFinished();
@@ -167,7 +175,14 @@ public class ExecutionGraphDeploymentTest {
 	@Test
 	public void testRegistrationOfExecutionsFailing() {
 		try {
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(7, 6);
+			
+			final JobVertexID jid1 = new JobVertexID();
+			final JobVertexID jid2 = new JobVertexID();
+			
+			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
 			
 			for (Execution e : executions.values()) {
 				e.markFailed(null);
@@ -184,7 +199,14 @@ public class ExecutionGraphDeploymentTest {
 	@Test
 	public void testRegistrationOfExecutionsFailedExternally() {
 		try {
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(7, 6);
+			
+			final JobVertexID jid1 = new JobVertexID();
+			final JobVertexID jid2 = new JobVertexID();
+			
+			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
 			
 			for (Execution e : executions.values()) {
 				e.fail(null);
@@ -201,7 +223,14 @@ public class ExecutionGraphDeploymentTest {
 	@Test
 	public void testRegistrationOfExecutionsCanceled() {
 		try {
-			Map<ExecutionAttemptID, Execution> executions = setupExecution(19, 37);
+			
+			final JobVertexID jid1 = new JobVertexID();
+			final JobVertexID jid2 = new JobVertexID();
+			
+			AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 19, v2, 37);
 			
 			for (Execution e : executions.values()) {
 				e.cancel();
@@ -216,15 +245,52 @@ public class ExecutionGraphDeploymentTest {
 		}
 	}
 	
-	private Map<ExecutionAttemptID, Execution> setupExecution(int dop1, int dop2) throws Exception {
+	@Test
+	public void testRegistrationOfExecutionsFailingFinalize() {
+		try {
+			
+			final JobVertexID jid1 = new JobVertexID();
+			final JobVertexID jid2 = new JobVertexID();
+			
+			AbstractJobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
+			AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+			
+			Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4);
+			
+			List<Execution> execList = new ArrayList<Execution>();
+			execList.addAll(executions.values());
+			// sort executions by job vertex. Failing job vertex first
+			Collections.sort(execList, new Comparator<Execution>() {
+				@Override
+				public int compare(Execution o1, Execution o2) {
+					return o1.getVertex().getSimpleName().compareTo(o2.getVertex().getSimpleName());
+				}
+			});
+			
+			int cnt = 0;
+			for (Execution e : execList) {
+				cnt++;
+				e.markFinished();
+				if(cnt <= 6) {
+					// the last execution of the first job vertex triggers the failing finalize hook
+					assertEquals(ExecutionState.FINISHED, e.getState());
+				} else {
+					// all following executions should be canceled
+					assertEquals(ExecutionState.CANCELED, e.getState());
+				}
+			}
+			
+			assertEquals(0, executions.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private Map<ExecutionAttemptID, Execution> setupExecution(AbstractJobVertex v1, int dop1, AbstractJobVertex v2, int dop2) throws Exception {
 		final JobID jobId = new JobID();
 		
-		final JobVertexID jid1 = new JobVertexID();
-		final JobVertexID jid2 = new JobVertexID();
-		
-		AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
-		AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
-		
 		v1.setParallelism(dop1);
 		v2.setParallelism(dop2);
 		
@@ -269,4 +335,23 @@ public class ExecutionGraphDeploymentTest {
 		
 		return executions;
 	}
+	
+	@SuppressWarnings("serial")
+	public static class FailingFinalizeJobVertex extends AbstractJobVertex {
+
+		public FailingFinalizeJobVertex(String name) {
+			super(name);
+		}
+		
+		public FailingFinalizeJobVertex(String name, JobVertexID id) {
+			super(name, id);
+		}
+		
+		@Override
+		public void finalizeOnMaster(ClassLoader cl) throws Exception {
+			throw new Exception();
+		}
+		
+		
+	}
 }


[2/2] incubator-flink git commit: [FLINK-1139] Fixed HadoopOutputFormat to run with DOP > 1

Posted by fh...@apache.org.
[FLINK-1139] Fixed HadoopOutputFormat to run with DOP > 1

This closes #173


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/15f58bb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/15f58bb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/15f58bb2

Branch: refs/heads/master
Commit: 15f58bb23c657951e2de48e6436820a093b393e7
Parents: 63eeafc
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Oct 6 16:28:00 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Dec 8 10:26:55 2014 +0100

----------------------------------------------------------------------
 .../mapred/HadoopOutputFormat.java              | 20 ++++++-
 .../mapreduce/HadoopOutputFormat.java           | 52 +++++++++++------
 .../mapreduce/example/WordCount.java            |  1 -
 .../mapred/HadoopMapredITCase.java              |  2 +-
 .../mapreduce/HadoopInputOutputITCase.java      |  3 +-
 .../common/operators/GenericDataSinkBase.java   | 12 ++++
 .../flink/test/util/AbstractTestBase.java       | 60 +++++++++++++++-----
 7 files changed, 113 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
index 434b409..64c539b 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
+import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -40,11 +41,11 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
 	
 	private static final long serialVersionUID = 1L;
 	
-	private JobConf jobConf;	
+	private JobConf jobConf;
 	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;	
 	private transient RecordWriter<K,V> recordWriter;	
 	private transient FileOutputCommitter fileOutputCommitter;
@@ -141,7 +142,20 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
 			this.fileOutputCommitter.commitTask(this.context);
 		}
-		this.fileOutputCommitter.commitJob(this.jobContext);
+	}
+	
+	@Override
+	public void finalizeGlobal(int parallelism) throws IOException {
+
+		try {
+			JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+			FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
+			
+			// finalize HDFS output format
+			fileOutputCommitter.commitJob(jobContext);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index fa8823b..402372c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -22,18 +22,17 @@ package org.apache.flink.hadoopcompatibility.mapreduce;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
+import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -41,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 
 
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
 	
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 	private transient RecordWriter<K,V> recordWriter;
 	private transient FileOutputCommitter fileOutputCommitter;
 	private transient TaskAttemptContext context;
+	private transient int taskNumber;
 	
 	public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
 		super();
@@ -95,6 +95,8 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 			throw new IOException("Task id too large.");
 		}
 		
+		this.taskNumber = taskNumber+1;
+		
 		// for hadoop 2.2
 		this.configuration.set("mapreduce.output.basename", "tmp");
 		
@@ -158,28 +160,42 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
 			this.fileOutputCommitter.commitTask(this.context);
 		}
-		this.fileOutputCommitter.commitJob(this.context);
-		
 		
 		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
 		
-		// rename tmp-* files to final name
+		// rename tmp-file to final name
 		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
 		
-		final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
+		String taskNumberStr = Integer.toString(this.taskNumber);
+		String tmpFileTemplate = "tmp-r-00000";
+		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
 		
-		// isDirectory does not work in hadoop 1
-		if(fs.getFileStatus(outputPath).isDir()) {
-			FileStatus[] files = fs.listStatus(outputPath);
+		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
+			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+		}
+	}
+	
+	@Override
+	public void finalizeGlobal(int parallelism) throws IOException {
+
+		JobContext jobContext;
+		TaskAttemptContext taskContext;
+		try {
+			
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
+					+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") 
+					+ Integer.toString(1) 
+					+ "_0");
 			
-			for(FileStatus f : files) {
-				Matcher m = p.matcher(f.getPath().getName());
-				if(m.matches()) {
-					int part = Integer.valueOf(m.group(2));
-					fs.rename(f.getPath(), new Path(outputPath.toString()+"/"+part));
-				}
-			}
+			jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
+			taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
 		}
+		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
+		
+		// finalize HDFS output format
+		this.fileOutputCommitter.commitJob(jobContext);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 53a6e6b..2b99fd2 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -55,7 +55,6 @@ public class WordCount {
 		final String outputPath = args[1];
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
 		
 		// Set up the Hadoop Input Format
 		Job job = Job.getInstance();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index f8c3bab..b6650d2 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -36,7 +36,7 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
 
 	@Override
 	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 000dc58..7eee629 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -32,11 +32,12 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase {
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
+		this.setDegreeOfParallelism(4);
 	}
 	
 	@Override
 	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index ca55855..a0f367e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -22,6 +22,8 @@ package org.apache.flink.api.common.operators;
 import java.util.List;
 
 import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -292,13 +294,23 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
 	
 	protected void executeOnCollections(List<IN> inputData) throws Exception {
 		OutputFormat<IN> format = this.formatWrapper.getUserCodeObject();
+		
+		if(format instanceof InitializeOnMaster) {
+			((InitializeOnMaster)format).initializeGlobal(1);
+		}
+		
 		format.configure(this.parameters);
 		
 		format.open(0, 1);
 		for (IN element : inputData) {
 			format.writeRecord(element);
 		}
+		
 		format.close();
+		
+		if(format instanceof FinalizeOnMaster) {
+			((FinalizeOnMaster)format).finalizeGlobal(1);
+		}
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c043ea8..1fd0e6e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -199,11 +200,11 @@ public abstract class AbstractTestBase {
 	// --------------------------------------------------------------------------------------------
 	
 	public BufferedReader[] getResultReader(String resultPath) throws IOException {
-		return getResultReader(resultPath, false);
+		return getResultReader(resultPath, new String[]{}, false);
 	}
 	
-	public BufferedReader[] getResultReader(String resultPath, boolean inOrderOfFiles) throws IOException {
-		File[] files = getAllInvolvedFiles(resultPath);
+	public BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
+		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 		
 		if (inOrderOfFiles) {
 			// sort the files after their name (1, 2, 3, 4)...
@@ -232,8 +233,14 @@ public abstract class AbstractTestBase {
 		return readers;
 	}
 	
+	
+	
 	public BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
-		File[] files = getAllInvolvedFiles(resultPath);
+		return getResultInputStream(resultPath, new String[]{});
+	}
+	
+	public BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException {
+		File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 		BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
 		for (int i = 0; i < files.length; i++) {
 			inStreams[i] = new BufferedInputStream(new FileInputStream(files[i]));
@@ -242,11 +249,15 @@ public abstract class AbstractTestBase {
 	}
 	
 	public void readAllResultLines(List<String> target, String resultPath) throws IOException {
-		readAllResultLines(target, resultPath, false);
+		readAllResultLines(target, resultPath, new String[]{});
 	}
 	
-	public void readAllResultLines(List<String> target, String resultPath, boolean inOrderOfFiles) throws IOException {
-		for (BufferedReader reader : getResultReader(resultPath, inOrderOfFiles)) {
+	public void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) throws IOException {
+		readAllResultLines(target, resultPath, excludePrefixes, false);
+	}
+	
+	public void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
+		for (BufferedReader reader : getResultReader(resultPath, excludePrefixes, inOrderOfFiles)) {
 			String s = null;
 			while ((s = reader.readLine()) != null) {
 				target.add(s);
@@ -255,8 +266,12 @@ public abstract class AbstractTestBase {
 	}
 	
 	public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception {
+		compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[]{});
+	}
+	
+	public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
-		readAllResultLines(list, resultPath, false);
+		readAllResultLines(list, resultPath, excludePrefixes, false);
 		
 		String[] result = (String[]) list.toArray(new String[list.size()]);
 		Arrays.sort(result);
@@ -267,10 +282,13 @@ public abstract class AbstractTestBase {
 		Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
 		Assert.assertArrayEquals(expected, result);
 	}
-	
 	public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception {
+		compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, resultPath, new String[]{});
+	}
+	
+	public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
-		readAllResultLines(list, resultPath, true);
+		readAllResultLines(list, resultPath, excludePrefixes, true);
 		
 		String[] result = (String[]) list.toArray(new String[list.size()]);
 		
@@ -281,8 +299,12 @@ public abstract class AbstractTestBase {
 	}
 	
 	public void compareKeyValueParisWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception {
+		compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
+	}
+	
+	public void compareKeyValueParisWithDelta(String expectedLines, String resultPath, String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
 		ArrayList<String> list = new ArrayList<String>();
-		readAllResultLines(list, resultPath, false);
+		readAllResultLines(list, resultPath, excludePrefixes, false);
 		
 		String[] result = (String[]) list.toArray(new String[list.size()]);
 		String[] expected = expectedLines.isEmpty() ? new String[0] : expectedLines.split("\n");
@@ -314,13 +336,25 @@ public abstract class AbstractTestBase {
 		}
 	}
 	
-	private File[] getAllInvolvedFiles(String resultPath) {
+	private File[] getAllInvolvedFiles(String resultPath, String[] excludePrefixes) {
+		final String[] exPrefs = excludePrefixes;
 		File result = asFile(resultPath);
 		if (!result.exists()) {
 			Assert.fail("Result file was not written");
 		}
 		if (result.isDirectory()) {
-			return result.listFiles();
+			return result.listFiles(new FilenameFilter() {
+				
+				@Override
+				public boolean accept(File dir, String name) {
+					for(String p: exPrefs) {
+						if(name.startsWith(p)) {
+							return false;
+						}
+					}
+					return true;
+				}
+			});
 		} else {
 			return new File[] { result };
 		}