You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/12/08 10:30:14 UTC
[1/2] incubator-flink git commit: [FLINK-1139] Added FinalizeOnMaster
hook to run code after the last task of an OutputFormat completed
Repository: incubator-flink
Updated Branches:
refs/heads/master 770ce2315 -> 15f58bb23
[FLINK-1139] Added FinalizeOnMaster hook to run code after the last task of an OutputFormat completed
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/63eeafc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/63eeafc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/63eeafc8
Branch: refs/heads/master
Commit: 63eeafc87630d27589106a4d6d1131f8993cfb66
Parents: 770ce23
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Oct 6 16:27:25 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Dec 8 10:24:14 2014 +0100
----------------------------------------------------------------------
.../flink/api/common/io/FinalizeOnMaster.java | 36 ++++++
.../runtime/executiongraph/ExecutionGraph.java | 17 ++-
.../executiongraph/ExecutionJobVertex.java | 9 ++
.../runtime/jobgraph/AbstractJobVertex.java | 9 ++
.../runtime/jobgraph/OutputFormatVertex.java | 35 ++++++
.../flink/runtime/jobmanager/JobManager.java | 16 +--
.../ExecutionGraphDeploymentTest.java | 115 ++++++++++++++++---
7 files changed, 212 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java
new file mode 100644
index 0000000..6fa535c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FinalizeOnMaster.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import java.io.IOException;
+
+/**
+ * This interface may be implemented by {@link OutputFormat}s to have the master finalize them globally.
+ *
+ */
+public interface FinalizeOnMaster {
+
+ /**
+ * The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished.
+ *
+ * @param parallelism The degree of parallelism with which the format or functions was run.
+ * @throws IOException The finalization may throw exceptions, which may cause the job to abort.
+ */
+ void finalizeGlobal(int parallelism) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 74e48c8..29f157c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -72,6 +72,9 @@ public class ExecutionGraph {
/** The job configuration that was originally attached to the JobGraph. */
private final Configuration jobConfiguration;
+ /** The classloader of the user code. */
+ private final ClassLoader userClassLoader;
+
/** All job vertices that are part of this graph */
private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
@@ -124,14 +127,20 @@ public class ExecutionGraph {
}
public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig,
- List<BlobKey> requiredJarFiles,ExecutorService executor) {
- if (jobId == null || jobName == null || jobConfig == null) {
+ List<BlobKey> requiredJarFiles, ExecutorService executor) {
+ this(jobId, jobName, jobConfig, requiredJarFiles, Thread.currentThread().getContextClassLoader(), null);
+ }
+
+ public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig,
+ List<BlobKey> requiredJarFiles, ClassLoader userClassLoader, ExecutorService executor) {
+ if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
throw new NullPointerException();
}
this.jobID = jobId;
this.jobName = jobName;
this.jobConfiguration = jobConfig;
+ this.userClassLoader = userClassLoader;
this.executor = executor;
this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>();
@@ -226,6 +235,10 @@ public class ExecutionGraph {
return jobConfiguration;
}
+ public ClassLoader getUserClassLoader() {
+ return this.userClassLoader;
+ }
+
public JobStatus getState() {
return state;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 73534f5..9f8b56a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -326,6 +326,15 @@ public class ExecutionJobVertex {
numSubtasksInFinalState++;
if (numSubtasksInFinalState == parallelism) {
+
+ // call finalizeOnMaster hook
+ try {
+ getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
+ }
+ catch (Throwable t) {
+ getGraph().fail(t);
+ }
+
// we are in our final state
stateMonitor.notifyAll();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 0ce07ed..b9e1eac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -374,6 +374,15 @@ public class AbstractJobVertex implements java.io.Serializable {
*/
public void initializeOnMaster(ClassLoader loader) throws Exception {}
+ /**
+ * A hook that can be overwritten by sub classes to implement logic that is called by the
+ * master after the job completed.
+ *
+ * @param loader The class loader for user defined code.
+ * @throws Exception The method may throw exceptions which cause the job to fail immediately.
+ */
+ public void finalizeOnMaster(ClassLoader loader) throws Exception {}
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
index 708b390..2a1f89c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.jobgraph;
+import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -83,4 +84,38 @@ public class OutputFormatVertex extends AbstractJobVertex {
((InitializeOnMaster) outputFormat).initializeGlobal(getParallelism());
}
}
+
+ @Override
+ public void finalizeOnMaster(ClassLoader loader) throws Exception {
+ final TaskConfig cfg = new TaskConfig(getConfiguration());
+
+ UserCodeWrapper<OutputFormat<?>> wrapper;
+ try {
+ wrapper = cfg.<OutputFormat<?>>getStubWrapper(loader);
+ }
+ catch (Throwable t) {
+ throw new Exception("Deserializing the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+ }
+ if (wrapper == null) {
+ throw new Exception("No input format present in InputFormatVertex's task configuration.");
+ }
+
+ OutputFormat<?> outputFormat;
+ try {
+ outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader);
+ }
+ catch (Throwable t) {
+ throw new Exception("Instantiating the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+ }
+ try {
+ outputFormat.configure(cfg.getStubParameters());
+ }
+ catch (Throwable t) {
+ throw new Exception("Configuring the OutputFormat (" + formatDescription + ") failed: " + t.getMessage(), t);
+ }
+
+ if (outputFormat instanceof FinalizeOnMaster) {
+ ((FinalizeOnMaster) outputFormat).finalizeGlobal(getParallelism());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 95287d1..0cd08f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -330,15 +330,21 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
// Register this job with the library cache manager
libraryCacheManager.registerJob(job.getJobID(), job.getUserJarBlobKeys());
+ // grab the class loader for user-defined code
+ final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(job.getJobID());
+ if (userCodeLoader == null) {
+ throw new JobException("The user code class loader could not be initialized.");
+ }
+
// get the existing execution graph (if we attach), or construct a new empty one to attach
executionGraph = this.currentJobs.get(job.getJobID());
if (executionGraph == null) {
if (LOG.isInfoEnabled()) {
LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')');
}
-
+
executionGraph = new ExecutionGraph(job.getJobID(), job.getName(),
- job.getJobConfiguration(), job.getUserJarBlobKeys(), this.executorService);
+ job.getJobConfiguration(), job.getUserJarBlobKeys(), userCodeLoader, this.executorService);
executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
@@ -358,12 +364,6 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
// Register for updates on the job status
executionGraph.registerJobStatusListener(this);
- // grab the class loader for user-defined code
- final ClassLoader userCodeLoader = libraryCacheManager.getClassLoader(job.getJobID());
- if (userCodeLoader == null) {
- throw new JobException("The user code class loader could not be initialized.");
- }
-
// first, perform the master initialization of the nodes
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName()));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/63eeafc8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 4cddcbd..14ce15a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -22,12 +22,15 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@@ -45,9 +48,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
-
import org.junit.Test;
-
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -150,7 +151,14 @@ public class ExecutionGraphDeploymentTest {
@Test
public void testRegistrationOfExecutionsFinishing() {
try {
- Map<ExecutionAttemptID, Execution> executions = setupExecution(7650, 2350);
+
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+ AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7650, v2, 2350);
for (Execution e : executions.values()) {
e.markFinished();
@@ -167,7 +175,14 @@ public class ExecutionGraphDeploymentTest {
@Test
public void testRegistrationOfExecutionsFailing() {
try {
- Map<ExecutionAttemptID, Execution> executions = setupExecution(7, 6);
+
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+ AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
for (Execution e : executions.values()) {
e.markFailed(null);
@@ -184,7 +199,14 @@ public class ExecutionGraphDeploymentTest {
@Test
public void testRegistrationOfExecutionsFailedExternally() {
try {
- Map<ExecutionAttemptID, Execution> executions = setupExecution(7, 6);
+
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+ AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 7, v2, 6);
for (Execution e : executions.values()) {
e.fail(null);
@@ -201,7 +223,14 @@ public class ExecutionGraphDeploymentTest {
@Test
public void testRegistrationOfExecutionsCanceled() {
try {
- Map<ExecutionAttemptID, Execution> executions = setupExecution(19, 37);
+
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
+ AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 19, v2, 37);
for (Execution e : executions.values()) {
e.cancel();
@@ -216,15 +245,52 @@ public class ExecutionGraphDeploymentTest {
}
}
- private Map<ExecutionAttemptID, Execution> setupExecution(int dop1, int dop2) throws Exception {
+ @Test
+ public void testRegistrationOfExecutionsFailingFinalize() {
+ try {
+
+ final JobVertexID jid1 = new JobVertexID();
+ final JobVertexID jid2 = new JobVertexID();
+
+ AbstractJobVertex v1 = new FailingFinalizeJobVertex("v1", jid1);
+ AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
+
+ Map<ExecutionAttemptID, Execution> executions = setupExecution(v1, 6, v2, 4);
+
+ List<Execution> execList = new ArrayList<Execution>();
+ execList.addAll(executions.values());
+ // sort executions by job vertex. Failing job vertex first
+ Collections.sort(execList, new Comparator<Execution>() {
+ @Override
+ public int compare(Execution o1, Execution o2) {
+ return o1.getVertex().getSimpleName().compareTo(o2.getVertex().getSimpleName());
+ }
+ });
+
+ int cnt = 0;
+ for (Execution e : execList) {
+ cnt++;
+ e.markFinished();
+ if(cnt <= 6) {
+ // the last execution of the first job vertex triggers the failing finalize hook
+ assertEquals(ExecutionState.FINISHED, e.getState());
+ } else {
+ // all following executions should be canceled
+ assertEquals(ExecutionState.CANCELED, e.getState());
+ }
+ }
+
+ assertEquals(0, executions.size());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private Map<ExecutionAttemptID, Execution> setupExecution(AbstractJobVertex v1, int dop1, AbstractJobVertex v2, int dop2) throws Exception {
final JobID jobId = new JobID();
- final JobVertexID jid1 = new JobVertexID();
- final JobVertexID jid2 = new JobVertexID();
-
- AbstractJobVertex v1 = new AbstractJobVertex("v1", jid1);
- AbstractJobVertex v2 = new AbstractJobVertex("v2", jid2);
-
v1.setParallelism(dop1);
v2.setParallelism(dop2);
@@ -269,4 +335,23 @@ public class ExecutionGraphDeploymentTest {
return executions;
}
+
+ @SuppressWarnings("serial")
+ public static class FailingFinalizeJobVertex extends AbstractJobVertex {
+
+ public FailingFinalizeJobVertex(String name) {
+ super(name);
+ }
+
+ public FailingFinalizeJobVertex(String name, JobVertexID id) {
+ super(name, id);
+ }
+
+ @Override
+ public void finalizeOnMaster(ClassLoader cl) throws Exception {
+ throw new Exception();
+ }
+
+
+ }
}
[2/2] incubator-flink git commit: [FLINK-1139] Fixed
HadoopOutputFormat to run with DOP > 1
Posted by fh...@apache.org.
[FLINK-1139] Fixed HadoopOutputFormat to run with DOP > 1
This closes #173
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/15f58bb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/15f58bb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/15f58bb2
Branch: refs/heads/master
Commit: 15f58bb23c657951e2de48e6436820a093b393e7
Parents: 63eeafc
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Oct 6 16:28:00 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Dec 8 10:26:55 2014 +0100
----------------------------------------------------------------------
.../mapred/HadoopOutputFormat.java | 20 ++++++-
.../mapreduce/HadoopOutputFormat.java | 52 +++++++++++------
.../mapreduce/example/WordCount.java | 1 -
.../mapred/HadoopMapredITCase.java | 2 +-
.../mapreduce/HadoopInputOutputITCase.java | 3 +-
.../common/operators/GenericDataSinkBase.java | 12 ++++
.../flink/test/util/AbstractTestBase.java | 60 +++++++++++++++-----
7 files changed, 113 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
index 434b409..64c539b 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
@@ -40,11 +41,11 @@ import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
private static final long serialVersionUID = 1L;
- private JobConf jobConf;
+ private JobConf jobConf;
private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
private transient RecordWriter<K,V> recordWriter;
private transient FileOutputCommitter fileOutputCommitter;
@@ -141,7 +142,20 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
this.fileOutputCommitter.commitTask(this.context);
}
- this.fileOutputCommitter.commitJob(this.jobContext);
+ }
+
+ @Override
+ public void finalizeGlobal(int parallelism) throws IOException {
+
+ try {
+ JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+ FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
+
+ // finalize HDFS output format
+ fileOutputCommitter.commitJob(jobContext);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index fa8823b..402372c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -22,18 +22,17 @@ package org.apache.flink.hadoopcompatibility.mapreduce;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -41,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
+public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
private transient RecordWriter<K,V> recordWriter;
private transient FileOutputCommitter fileOutputCommitter;
private transient TaskAttemptContext context;
+ private transient int taskNumber;
public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
super();
@@ -95,6 +95,8 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
throw new IOException("Task id too large.");
}
+ this.taskNumber = taskNumber+1;
+
// for hadoop 2.2
this.configuration.set("mapreduce.output.basename", "tmp");
@@ -158,28 +160,42 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
this.fileOutputCommitter.commitTask(this.context);
}
- this.fileOutputCommitter.commitJob(this.context);
-
Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
- // rename tmp-* files to final name
+ // rename tmp-file to final name
FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
- final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
+ String taskNumberStr = Integer.toString(this.taskNumber);
+ String tmpFileTemplate = "tmp-r-00000";
+ String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
- // isDirectory does not work in hadoop 1
- if(fs.getFileStatus(outputPath).isDir()) {
- FileStatus[] files = fs.listStatus(outputPath);
+ if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
+ fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+ }
+ }
+
+ @Override
+ public void finalizeGlobal(int parallelism) throws IOException {
+
+ JobContext jobContext;
+ TaskAttemptContext taskContext;
+ try {
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ + String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0")
+ + Integer.toString(1)
+ + "_0");
- for(FileStatus f : files) {
- Matcher m = p.matcher(f.getPath().getName());
- if(m.matches()) {
- int part = Integer.valueOf(m.group(2));
- fs.rename(f.getPath(), new Path(outputPath.toString()+"/"+part));
- }
- }
+ jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
+ taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
+ this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
+
+ // finalize HDFS output format
+ this.fileOutputCommitter.commitJob(jobContext);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 53a6e6b..2b99fd2 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -55,7 +55,6 @@ public class WordCount {
final String outputPath = args[1];
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(1);
// Set up the Hadoop Input Format
Job job = Job.getInstance();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index f8c3bab..b6650d2 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -36,7 +36,7 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
@Override
protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+ compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 000dc58..7eee629 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -32,11 +32,12 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase {
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);
resultPath = getTempDirPath("result");
+ this.setDegreeOfParallelism(4);
}
@Override
protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+ compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
index ca55855..a0f367e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSinkBase.java
@@ -22,6 +22,8 @@ package org.apache.flink.api.common.operators;
import java.util.List;
import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -292,13 +294,23 @@ public class GenericDataSinkBase<IN> extends Operator<Nothing> {
protected void executeOnCollections(List<IN> inputData) throws Exception {
OutputFormat<IN> format = this.formatWrapper.getUserCodeObject();
+
+ if(format instanceof InitializeOnMaster) {
+ ((InitializeOnMaster)format).initializeGlobal(1);
+ }
+
format.configure(this.parameters);
format.open(0, 1);
for (IN element : inputData) {
format.writeRecord(element);
}
+
format.close();
+
+ if(format instanceof FinalizeOnMaster) {
+ ((FinalizeOnMaster)format).finalizeGlobal(1);
+ }
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/15f58bb2/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c043ea8..1fd0e6e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -199,11 +200,11 @@ public abstract class AbstractTestBase {
// --------------------------------------------------------------------------------------------
public BufferedReader[] getResultReader(String resultPath) throws IOException {
- return getResultReader(resultPath, false);
+ return getResultReader(resultPath, new String[]{}, false);
}
- public BufferedReader[] getResultReader(String resultPath, boolean inOrderOfFiles) throws IOException {
- File[] files = getAllInvolvedFiles(resultPath);
+ public BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
+ File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
if (inOrderOfFiles) {
// sort the files after their name (1, 2, 3, 4)...
@@ -232,8 +233,14 @@ public abstract class AbstractTestBase {
return readers;
}
+
+
public BufferedInputStream[] getResultInputStream(String resultPath) throws IOException {
- File[] files = getAllInvolvedFiles(resultPath);
+ return getResultInputStream(resultPath, new String[]{});
+ }
+
+ public BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException {
+ File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
BufferedInputStream[] inStreams = new BufferedInputStream[files.length];
for (int i = 0; i < files.length; i++) {
inStreams[i] = new BufferedInputStream(new FileInputStream(files[i]));
@@ -242,11 +249,15 @@ public abstract class AbstractTestBase {
}
public void readAllResultLines(List<String> target, String resultPath) throws IOException {
- readAllResultLines(target, resultPath, false);
+ readAllResultLines(target, resultPath, new String[]{});
}
- public void readAllResultLines(List<String> target, String resultPath, boolean inOrderOfFiles) throws IOException {
- for (BufferedReader reader : getResultReader(resultPath, inOrderOfFiles)) {
+ public void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes) throws IOException {
+ readAllResultLines(target, resultPath, excludePrefixes, false);
+ }
+
+ public void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
+ for (BufferedReader reader : getResultReader(resultPath, excludePrefixes, inOrderOfFiles)) {
String s = null;
while ((s = reader.readLine()) != null) {
target.add(s);
@@ -255,8 +266,12 @@ public abstract class AbstractTestBase {
}
public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception {
+ compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[]{});
+ }
+
+ public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<String>();
- readAllResultLines(list, resultPath, false);
+ readAllResultLines(list, resultPath, excludePrefixes, false);
String[] result = (String[]) list.toArray(new String[list.size()]);
Arrays.sort(result);
@@ -267,10 +282,13 @@ public abstract class AbstractTestBase {
Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
Assert.assertArrayEquals(expected, result);
}
-
public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception {
+ compareResultsByLinesInMemoryWithStrictOrder(expectedResultStr, resultPath, new String[]{});
+ }
+
+ public void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<String>();
- readAllResultLines(list, resultPath, true);
+ readAllResultLines(list, resultPath, excludePrefixes, true);
String[] result = (String[]) list.toArray(new String[list.size()]);
@@ -281,8 +299,12 @@ public abstract class AbstractTestBase {
}
public void compareKeyValueParisWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception {
+ compareKeyValueParisWithDelta(expectedLines, resultPath, new String[]{}, delimiter, maxDelta);
+ }
+
+ public void compareKeyValueParisWithDelta(String expectedLines, String resultPath, String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
ArrayList<String> list = new ArrayList<String>();
- readAllResultLines(list, resultPath, false);
+ readAllResultLines(list, resultPath, excludePrefixes, false);
String[] result = (String[]) list.toArray(new String[list.size()]);
String[] expected = expectedLines.isEmpty() ? new String[0] : expectedLines.split("\n");
@@ -314,13 +336,25 @@ public abstract class AbstractTestBase {
}
}
- private File[] getAllInvolvedFiles(String resultPath) {
+ private File[] getAllInvolvedFiles(String resultPath, String[] excludePrefixes) {
+ final String[] exPrefs = excludePrefixes;
File result = asFile(resultPath);
if (!result.exists()) {
Assert.fail("Result file was not written");
}
if (result.isDirectory()) {
- return result.listFiles();
+ return result.listFiles(new FilenameFilter() {
+
+ @Override
+ public boolean accept(File dir, String name) {
+ for(String p: exPrefs) {
+ if(name.startsWith(p)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ });
} else {
return new File[] { result };
}