You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/21 14:11:27 UTC

[flink] 01/03: [hotfix][task] Remove unnecessary fields and methods from StreamTask

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a1666e8678ca79bf57ab1279ea2dfc9f6f8e8a7
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Mar 19 17:21:50 2020 +0800

    [hotfix][task] Remove unnecessary fields and methods from StreamTask
    
    This closes #11459
---
 .../streaming/api/operators/AbstractStreamOperator.java    |  2 +-
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   | 14 --------------
 .../org/apache/flink/streaming/util/MockStreamTask.java    | 10 ----------
 .../apache/flink/streaming/util/MockStreamTaskBuilder.java |  5 -----
 4 files changed, 1 insertion(+), 30 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f876d3c..532c57f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -227,7 +227,7 @@ public abstract class AbstractStreamOperator<OUT>
 				LatencyStats.Granularity.SINGLE);
 		}
 
-		this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap());
+		this.runtimeContext = new StreamingRuntimeContext(this, environment, environment.getAccumulatorRegistry().getUserMap());
 
 		stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
 		stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aabd9c5..82651ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.Counter;
@@ -44,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -184,9 +182,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
 
-	/** The map of user-defined accumulators of this task. */
-	private final Map<String, Accumulator<?, ?>> accumulatorMap;
-
 	/** The currently active background materialization threads. */
 	private final CloseableRegistry cancelables = new CloseableRegistry();
 
@@ -272,7 +267,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		this.timerService = timerService;
 		this.uncaughtExceptionHandler = Preconditions.checkNotNull(uncaughtExceptionHandler);
 		this.configuration = new StreamConfig(getTaskConfiguration());
-		this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
 		this.recordWriter = createRecordWriterDelegate(configuration, environment);
 		this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
 		this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
@@ -687,18 +681,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return configuration;
 	}
 
-	public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
-		return accumulatorMap;
-	}
-
 	public StreamStatusMaintainer getStreamStatusMaintainer() {
 		return operatorChain;
 	}
 
-	public OperatorEventDispatcher getOperatorEventDispatcher() {
-		return operatorChain.getOperatorEventDispatcher();
-	}
-
 	RecordWriterOutput<?>[] getStreamOutputs() {
 		return operatorChain.getStreamOutputs();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index ffd4a50..d190924 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
@@ -36,7 +35,6 @@ import org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 
-import java.util.Map;
 import java.util.function.BiConsumer;
 
 /**
@@ -54,7 +52,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 	private final CheckpointStorageWorkerView checkpointStorage;
 	private final ProcessingTimeService processingTimeService;
 	private final BiConsumer<String, Throwable> handleAsyncException;
-	private final Map<String, Accumulator<?, ?>> accumulatorMap;
 
 	public MockStreamTask(
 		Environment environment,
@@ -68,7 +65,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 		CheckpointStorageWorkerView checkpointStorage,
 		TimerService timerService,
 		BiConsumer<String, Throwable> handleAsyncException,
-		Map<String, Accumulator<?, ?>> accumulatorMap,
 		TaskMailbox taskMailbox,
 		StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor
 	) {
@@ -83,7 +79,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 		this.checkpointStorage = checkpointStorage;
 		this.processingTimeService = timerService;
 		this.handleAsyncException = handleAsyncException;
-		this.accumulatorMap = accumulatorMap;
 	}
 
 	@Override
@@ -159,11 +154,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
 	}
 
 	@Override
-	public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
-		return accumulatorMap;
-	}
-
-	@Override
 	public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
 		return mailboxExecutor -> processingTimeService;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
index 904c5e9..680fac0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.execution.Environment;
@@ -39,8 +38,6 @@ import org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 
-import java.util.Collections;
-import java.util.Map;
 import java.util.function.BiConsumer;
 
 /**
@@ -58,7 +55,6 @@ public class MockStreamTaskBuilder {
 	private TimerService timerService = new TestProcessingTimeService();
 	private StreamTaskStateInitializer streamTaskStateInitializer;
 	private BiConsumer<String, Throwable> handleAsyncException = (message, throwable) -> { };
-	private Map<String, Accumulator<?, ?>> accumulatorMap = Collections.emptyMap();
 	private TaskMailbox taskMailbox = new TaskMailboxImpl();
 	private StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor = StreamTaskActionExecutor.synchronizedExecutor();
 
@@ -143,7 +139,6 @@ public class MockStreamTaskBuilder {
 			checkpointStorage,
 			timerService,
 			handleAsyncException,
-			accumulatorMap,
 			taskMailbox,
 			taskActionExecutor);
 	}