You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:51 UTC

[55/82] [abbrv] incubator-flink git commit: Removed JobStatusListener and ExecutionListener. Fixed LocalExecutor output for maven verify.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 9113fcd..9a0479f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -55,7 +55,11 @@ public final class BlobClient implements Closeable {
 	public BlobClient(final InetSocketAddress serverAddress) throws IOException {
 
 		this.socket = new Socket();
-		this.socket.connect(serverAddress);
+		try {
+			this.socket.connect(serverAddress);
+		}catch(IOException e){
+			throw new IOException("Could not connect to BlobServer at address " + serverAddress, e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index de22e0f..0b02acb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -101,15 +101,21 @@ public final class BlobServer extends Thread implements BlobService{
 	 */
 	public BlobServer() throws IOException {
 
-		this.serverSocket = new ServerSocket(0);
-		start();
+		try {
+			this.serverSocket = new ServerSocket(0);
+
+			start();
+
+			if (LOG.isInfoEnabled()) {
+				LOG.info(String.format("Started BLOB server on port %d",
+						this.serverSocket.getLocalPort()));
+			}
 
-		if (LOG.isInfoEnabled()) {
-			LOG.info(String.format("Started BLOB server on port %d",
-				this.serverSocket.getLocalPort()));
+			this.storageDir = BlobUtils.initStorageDirectory();
+		}catch(IOException e){
+			throw new IOException("Could not create BlobServer with random port.", e);
 		}
 
-		this.storageDir = BlobUtils.initStorageDirectory();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
deleted file mode 100644
index bd2e118..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * Implementing this interface allows classes to receive notifications about
- * changes of a task's execution state.
- */
-public interface ExecutionListener {
-
-	void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
-			ExecutionState newExecutionState, String optionalMessage);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 915140c..8faa235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.execution.ExecutionListener;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -97,10 +96,6 @@ public class ExecutionGraph {
 	
 	private final List<BlobKey> requiredJarFiles;
 	
-	private final List<JobStatusListener> jobStatusListeners;
-	
-	private final List<ExecutionListener> executionListeners;
-
 	private final List<ActorRef> jobStatusListenerActors;
 
 	private final List<ActorRef> executionListenerActors;
@@ -150,8 +145,6 @@ public class ExecutionGraph {
 		this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
 		this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
 		
-		this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
-		this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
 		this.jobStatusListenerActors  = new CopyOnWriteArrayList<ActorRef>();
 		this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
 		
@@ -638,14 +631,6 @@ public class ExecutionGraph {
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------
 	
-	public void registerJobStatusListener(JobStatusListener jobStatusListener) {
-		this.jobStatusListeners.add(jobStatusListener);
-	}
-	
-	public void registerExecutionListener(ExecutionListener executionListener) {
-		this.executionListeners.add(executionListener);
-	}
-
 	public void registerJobStatusListener(ActorRef listener){
 		this.jobStatusListenerActors.add(listener);
 
@@ -662,20 +647,6 @@ public class ExecutionGraph {
 	 * @param error
 	 */
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
-		if (jobStatusListeners.size() > 0) {
-			
-			String message = error == null ? null : ExceptionUtils.stringifyException(error);
-		
-			for (JobStatusListener listener : this.jobStatusListeners) {
-				try {
-					listener.jobStatusHasChanged(this, newState, message);
-				}
-				catch (Throwable t) {
-					LOG.error("Notification of job status change caused an error.", t);
-				}
-			}
-		}
-
 		if(jobStatusListenerActors.size() > 0){
 			String message = error == null ? null : ExceptionUtils.stringifyException(error);
 			for(ActorRef listener: jobStatusListenerActors){
@@ -696,17 +667,6 @@ public class ExecutionGraph {
 	 */
 	void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
 							newExecutionState, Throwable error) {
-		if(executionListeners.size() >0){
-			String message = error == null ? null : ExceptionUtils.stringifyException(error);
-			for (ExecutionListener listener : this.executionListeners) {
-				try {
-					listener.executionStateChanged(jobID, vertexId, subtask,executionID, newExecutionState, message);
-				}catch(Throwable t){
-					LOG.error("Notification of execution state change caused an error.");
-				}
-			}
-		}
-
 		ExecutionJobVertex vertex = getJobVertex(vertexId);
 
 		if(executionListenerActors.size() >0){

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
deleted file mode 100644
index b06d2b2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-/**
- * This interface allows objects to receive notifications when the status of an observed job has changed.
- */
-public interface JobStatusListener {
-
-	/**
-	 * Called when the status of the job changed.
-	 * 
-	 * @param executionGraph   The executionGraph representing the job.
-	 * @param newJobStatus     The new job status.
-	 * @param optionalMessage  An optional message (possibly <code>null</code>) that can be attached to the state change.
-	 */
-	void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
index a7f1bd2..709a05c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
@@ -29,23 +29,24 @@ import org.apache.flink.runtime.taskmanager.Task;
 public interface TaskManagerProfiler {
 
 	/**
-	 * Registers an {@link org.apache.flink.runtime.execution.ExecutionListener} object for profiling.
+	 * Registers a {@link org.apache.flink.runtime.taskmanager.Task} object for profiling.
 	 * 
 	 * @param task
 	 *        task to be register a profiling listener for
 	 * @param jobConfiguration
 	 *        the job configuration sent with the task
 	 */
-	void registerExecutionListener(Task task, Configuration jobConfiguration);
+	void registerTask(Task task, Configuration jobConfiguration);
 
 	/**
-	 * Unregisters all previously register {@link org.apache.flink.runtime.execution.ExecutionListener} objects for
-	 * the vertex identified by the given ID.
+	 * Unregisters all previously registered {@link org.apache.flink.runtime.taskmanager.Task}
+	 * objects for the vertex identified by the given ID.
 	 * 
 	 * @param id
-	 *        the ID of the vertex to unregister the {@link org.apache.flink.runtime.execution.ExecutionListener} objects for
+	 *        the ID of the vertex to unregister the
+	 *        {@link org.apache.flink.runtime.taskmanager.Task} objects for
 	 */
-	void unregisterExecutionListener(ExecutionAttemptID id);
+	void unregisterTask(ExecutionAttemptID id);
 
 	/**
 	 * Shuts done the task manager's profiling component

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1903a3c..8570992 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.ExecutionListener;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -62,9 +61,6 @@ public final class Task {
 	private final String taskName;
 
 	private final TaskManager taskManager;
-	
-	
-	private final List<ExecutionListener> executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
 
 	private final List<ActorRef> executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
 
@@ -354,7 +350,7 @@ public final class Task {
 	 *        the configuration attached to the job
 	 */
 	public void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration) {
-		taskManagerProfiler.registerExecutionListener(this, jobConfiguration);
+		taskManagerProfiler.registerTask(this, jobConfiguration);
 	}
 
 	/**
@@ -365,7 +361,7 @@ public final class Task {
 	 */
 	public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) {
 		if (taskManagerProfiler != null) {
-			taskManagerProfiler.unregisterExecutionListener(this.executionId);
+			taskManagerProfiler.unregisterTask(this.executionId);
 		}
 	}
 	
@@ -373,24 +369,10 @@ public final class Task {
 	//                                     State Listeners
 	// --------------------------------------------------------------------------------------------
 	
-	public void registerExecutionListener(ExecutionListener listener) {
-		if (listener == null) {
-			throw new IllegalArgumentException();
-		}
-		this.executionListeners.add(listener);
-	}
-
 	public void registerExecutionListener(ActorRef listener){
 		executionListenerActors.add(listener);
 	}
 
-	public void unregisterExecutionListener(ExecutionListener listener) {
-		if (listener == null) {
-			throw new IllegalArgumentException();
-		}
-		this.executionListeners.remove(listener);
-	}
-
 	public void unregisterExecutionListener(ActorRef listener){
 		executionListenerActors.remove(listener);
 	}
@@ -400,15 +382,6 @@ public final class Task {
 			LOG.info(getTaskNameWithSubtasks() + " switched to " + newState + (message == null ? "" : " : " + message));
 		}
 		
-		for (ExecutionListener listener : this.executionListeners) {
-			try {
-				listener.executionStateChanged(jobId, vertexId, subtaskIndex, executionId, newState, message);
-			}
-			catch (Throwable t) {
-				LOG.error("Error while calling execution listener.", t);
-			}
-		}
-
 		for(ActorRef listener: executionListenerActors){
 			listener.tell(new ExecutionGraphMessages.ExecutionStateChanged(
 							jobId, vertexId, taskName, numberOfSubtasks, subtaskIndex,