You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:51 UTC
[55/82] [abbrv] incubator-flink git commit: Removed JobStatusListener
and ExecutionListener. Fixed LocalExecutor output for maven verify.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 9113fcd..9a0479f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -55,7 +55,11 @@ public final class BlobClient implements Closeable {
public BlobClient(final InetSocketAddress serverAddress) throws IOException {
this.socket = new Socket();
- this.socket.connect(serverAddress);
+ try {
+ this.socket.connect(serverAddress);
+ }catch(IOException e){
+ throw new IOException("Could not connect to BlobServer at address " + serverAddress, e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index de22e0f..0b02acb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -101,15 +101,21 @@ public final class BlobServer extends Thread implements BlobService{
*/
public BlobServer() throws IOException {
- this.serverSocket = new ServerSocket(0);
- start();
+ try {
+ this.serverSocket = new ServerSocket(0);
+
+ start();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info(String.format("Started BLOB server on port %d",
+ this.serverSocket.getLocalPort()));
+ }
- if (LOG.isInfoEnabled()) {
- LOG.info(String.format("Started BLOB server on port %d",
- this.serverSocket.getLocalPort()));
+ this.storageDir = BlobUtils.initStorageDirectory();
+ }catch(IOException e){
+ throw new IOException("Could not create BlobServer with random port.", e);
}
- this.storageDir = BlobUtils.initStorageDirectory();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
deleted file mode 100644
index bd2e118..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-
-/**
- * Implementing this interface allows classes to receive notifications about
- * changes of a task's execution state.
- */
-public interface ExecutionListener {
-
- void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId,
- ExecutionState newExecutionState, String optionalMessage);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 915140c..8faa235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
@@ -97,10 +96,6 @@ public class ExecutionGraph {
private final List<BlobKey> requiredJarFiles;
- private final List<JobStatusListener> jobStatusListeners;
-
- private final List<ExecutionListener> executionListeners;
-
private final List<ActorRef> jobStatusListenerActors;
private final List<ActorRef> executionListenerActors;
@@ -150,8 +145,6 @@ public class ExecutionGraph {
this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
- this.jobStatusListeners = new CopyOnWriteArrayList<JobStatusListener>();
- this.executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
this.jobStatusListenerActors = new CopyOnWriteArrayList<ActorRef>();
this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
@@ -638,14 +631,6 @@ public class ExecutionGraph {
// Listeners & Observers
// --------------------------------------------------------------------------------------------
- public void registerJobStatusListener(JobStatusListener jobStatusListener) {
- this.jobStatusListeners.add(jobStatusListener);
- }
-
- public void registerExecutionListener(ExecutionListener executionListener) {
- this.executionListeners.add(executionListener);
- }
-
public void registerJobStatusListener(ActorRef listener){
this.jobStatusListenerActors.add(listener);
@@ -662,20 +647,6 @@ public class ExecutionGraph {
* @param error
*/
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
- if (jobStatusListeners.size() > 0) {
-
- String message = error == null ? null : ExceptionUtils.stringifyException(error);
-
- for (JobStatusListener listener : this.jobStatusListeners) {
- try {
- listener.jobStatusHasChanged(this, newState, message);
- }
- catch (Throwable t) {
- LOG.error("Notification of job status change caused an error.", t);
- }
- }
- }
-
if(jobStatusListenerActors.size() > 0){
String message = error == null ? null : ExceptionUtils.stringifyException(error);
for(ActorRef listener: jobStatusListenerActors){
@@ -696,17 +667,6 @@ public class ExecutionGraph {
*/
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
newExecutionState, Throwable error) {
- if(executionListeners.size() >0){
- String message = error == null ? null : ExceptionUtils.stringifyException(error);
- for (ExecutionListener listener : this.executionListeners) {
- try {
- listener.executionStateChanged(jobID, vertexId, subtask,executionID, newExecutionState, message);
- }catch(Throwable t){
- LOG.error("Notification of execution state change caused an error.");
- }
- }
- }
-
ExecutionJobVertex vertex = getJobVertex(vertexId);
if(executionListenerActors.size() >0){
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
deleted file mode 100644
index b06d2b2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-/**
- * This interface allows objects to receive notifications when the status of an observed job has changed.
- */
-public interface JobStatusListener {
-
- /**
- * Called when the status of the job changed.
- *
- * @param executionGraph The executionGraph representing the job.
- * @param newJobStatus The new job status.
- * @param optionalMessage An optional message (possibly <code>null</code>) that can be attached to the state change.
- */
- void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage);
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
index a7f1bd2..709a05c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/TaskManagerProfiler.java
@@ -29,23 +29,24 @@ import org.apache.flink.runtime.taskmanager.Task;
public interface TaskManagerProfiler {
/**
- * Registers an {@link org.apache.flink.runtime.execution.ExecutionListener} object for profiling.
+ * Registers a {@link org.apache.flink.runtime.taskmanager.Task} object for profiling.
*
* @param task
* task to be register a profiling listener for
* @param jobConfiguration
* the job configuration sent with the task
*/
- void registerExecutionListener(Task task, Configuration jobConfiguration);
+ void registerTask(Task task, Configuration jobConfiguration);
/**
- * Unregisters all previously register {@link org.apache.flink.runtime.execution.ExecutionListener} objects for
- * the vertex identified by the given ID.
+ * Unregisters all previously registered {@link org.apache.flink.runtime.taskmanager.Task}
+ * objects for the vertex identified by the given ID.
*
* @param id
- * the ID of the vertex to unregister the {@link org.apache.flink.runtime.execution.ExecutionListener} objects for
+ * the ID of the vertex to unregister the
+ * {@link org.apache.flink.runtime.taskmanager.Task} objects for
*/
- void unregisterExecutionListener(ExecutionAttemptID id);
+ void unregisterTask(ExecutionAttemptID id);
/**
* Shuts done the task manager's profiling component
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1903a3c..8570992 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.RuntimeEnvironment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -62,9 +61,6 @@ public final class Task {
private final String taskName;
private final TaskManager taskManager;
-
-
- private final List<ExecutionListener> executionListeners = new CopyOnWriteArrayList<ExecutionListener>();
private final List<ActorRef> executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
@@ -354,7 +350,7 @@ public final class Task {
* the configuration attached to the job
*/
public void registerProfiler(TaskManagerProfiler taskManagerProfiler, Configuration jobConfiguration) {
- taskManagerProfiler.registerExecutionListener(this, jobConfiguration);
+ taskManagerProfiler.registerTask(this, jobConfiguration);
}
/**
@@ -365,7 +361,7 @@ public final class Task {
*/
public void unregisterProfiler(TaskManagerProfiler taskManagerProfiler) {
if (taskManagerProfiler != null) {
- taskManagerProfiler.unregisterExecutionListener(this.executionId);
+ taskManagerProfiler.unregisterTask(this.executionId);
}
}
@@ -373,24 +369,10 @@ public final class Task {
// State Listeners
// --------------------------------------------------------------------------------------------
- public void registerExecutionListener(ExecutionListener listener) {
- if (listener == null) {
- throw new IllegalArgumentException();
- }
- this.executionListeners.add(listener);
- }
-
public void registerExecutionListener(ActorRef listener){
executionListenerActors.add(listener);
}
- public void unregisterExecutionListener(ExecutionListener listener) {
- if (listener == null) {
- throw new IllegalArgumentException();
- }
- this.executionListeners.remove(listener);
- }
-
public void unregisterExecutionListener(ActorRef listener){
executionListenerActors.remove(listener);
}
@@ -400,15 +382,6 @@ public final class Task {
LOG.info(getTaskNameWithSubtasks() + " switched to " + newState + (message == null ? "" : " : " + message));
}
- for (ExecutionListener listener : this.executionListeners) {
- try {
- listener.executionStateChanged(jobId, vertexId, subtaskIndex, executionId, newState, message);
- }
- catch (Throwable t) {
- LOG.error("Error while calling execution listener.", t);
- }
- }
-
for(ActorRef listener: executionListenerActors){
listener.tell(new ExecutionGraphMessages.ExecutionStateChanged(
jobId, vertexId, taskName, numberOfSubtasks, subtaskIndex,