You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/23 15:41:21 UTC

[3/5] flink git commit: [hotfix] [runtime] Remove unused and unnecessary method from StreamTask

[hotfix] [runtime] Remove unused and unnecessary method from StreamTask


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ee3c58ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ee3c58ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ee3c58ea

Branch: refs/heads/master
Commit: ee3c58ea65d770a7abc4ab0418351ca5e53bed29
Parents: b485f8c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 16 21:08:29 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed May 23 17:28:55 2018 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 20 +-------------------
 1 file changed, 1 insertion(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ee3c58ea/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2cc8886..6ea442d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -41,13 +40,11 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
@@ -211,9 +208,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		this.timerService = timeProvider;
 		this.configuration = new StreamConfig(getTaskConfiguration());
-		this.streamRecordWriters = createStreamRecordWriters(
-			configuration,
-			environment);
+		this.streamRecordWriters = createStreamRecordWriters(configuration, environment);
 	}
 
 	// ------------------------------------------------------------------------
@@ -541,10 +536,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return operatorChain;
 	}
 
-	Output<StreamRecord<OUT>> getHeadOutput() {
-		return operatorChain.getChainEntryPoint();
-	}
-
 	RecordWriterOutput<?>[] getStreamOutputs() {
 		return operatorChain.getStreamOutputs();
 	}
@@ -750,15 +741,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return new CheckpointExceptionHandlerFactory();
 	}
 
-	private String createOperatorIdentifier(StreamOperator<?> operator) {
-		TaskInfo taskInfo = getEnvironment().getTaskInfo();
-		return new OperatorSubtaskDescriptionText(
-			operator.getOperatorID(),
-			operator.getClass().getSimpleName(),
-			taskInfo.getIndexOfThisSubtask(),
-			taskInfo.getNumberOfParallelSubtasks()).toString();
-	}
-
 	/**
 	 * Returns the {@link ProcessingTimeService} responsible for telling the current
 	 * processing time and registering timers.