You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/21 14:11:27 UTC
[flink] 01/03: [hotfix][task] Remove unnecessary fields and methods
from StreamTask
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8a1666e8678ca79bf57ab1279ea2dfc9f6f8e8a7
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Mar 19 17:21:50 2020 +0800
[hotfix][task] Remove unnecessary fields and methods from StreamTask
This closes #11459
---
.../streaming/api/operators/AbstractStreamOperator.java | 2 +-
.../apache/flink/streaming/runtime/tasks/StreamTask.java | 14 --------------
.../org/apache/flink/streaming/util/MockStreamTask.java | 10 ----------
.../apache/flink/streaming/util/MockStreamTaskBuilder.java | 5 -----
4 files changed, 1 insertion(+), 30 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f876d3c..532c57f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -227,7 +227,7 @@ public abstract class AbstractStreamOperator<OUT>
LatencyStats.Granularity.SINGLE);
}
- this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap());
+ this.runtimeContext = new StreamingRuntimeContext(this, environment, environment.getAccumulatorRegistry().getUserMap());
stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index aabd9c5..82651ac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.Counter;
@@ -44,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -184,9 +182,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
- /** The map of user-defined accumulators of this task. */
- private final Map<String, Accumulator<?, ?>> accumulatorMap;
-
/** The currently active background materialization threads. */
private final CloseableRegistry cancelables = new CloseableRegistry();
@@ -272,7 +267,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
this.timerService = timerService;
this.uncaughtExceptionHandler = Preconditions.checkNotNull(uncaughtExceptionHandler);
this.configuration = new StreamConfig(getTaskConfiguration());
- this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
this.recordWriter = createRecordWriterDelegate(configuration, environment);
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
@@ -687,18 +681,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return configuration;
}
- public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
- return accumulatorMap;
- }
-
public StreamStatusMaintainer getStreamStatusMaintainer() {
return operatorChain;
}
- public OperatorEventDispatcher getOperatorEventDispatcher() {
- return operatorChain.getOperatorEventDispatcher();
- }
-
RecordWriterOutput<?>[] getStreamOutputs() {
return operatorChain.getStreamOutputs();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
index ffd4a50..d190924 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTask.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.util;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
@@ -36,7 +35,6 @@ import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
-import java.util.Map;
import java.util.function.BiConsumer;
/**
@@ -54,7 +52,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
private final CheckpointStorageWorkerView checkpointStorage;
private final ProcessingTimeService processingTimeService;
private final BiConsumer<String, Throwable> handleAsyncException;
- private final Map<String, Accumulator<?, ?>> accumulatorMap;
public MockStreamTask(
Environment environment,
@@ -68,7 +65,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
CheckpointStorageWorkerView checkpointStorage,
TimerService timerService,
BiConsumer<String, Throwable> handleAsyncException,
- Map<String, Accumulator<?, ?>> accumulatorMap,
TaskMailbox taskMailbox,
StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor
) {
@@ -83,7 +79,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
this.checkpointStorage = checkpointStorage;
this.processingTimeService = timerService;
this.handleAsyncException = handleAsyncException;
- this.accumulatorMap = accumulatorMap;
}
@Override
@@ -159,11 +154,6 @@ public class MockStreamTask<OUT, OP extends StreamOperator<OUT>> extends StreamT
}
@Override
- public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
- return accumulatorMap;
- }
-
- @Override
public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
return mailboxExecutor -> processingTimeService;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
index 904c5e9..680fac0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamTaskBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.execution.Environment;
@@ -39,8 +38,6 @@ import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
-import java.util.Collections;
-import java.util.Map;
import java.util.function.BiConsumer;
/**
@@ -58,7 +55,6 @@ public class MockStreamTaskBuilder {
private TimerService timerService = new TestProcessingTimeService();
private StreamTaskStateInitializer streamTaskStateInitializer;
private BiConsumer<String, Throwable> handleAsyncException = (message, throwable) -> { };
- private Map<String, Accumulator<?, ?>> accumulatorMap = Collections.emptyMap();
private TaskMailbox taskMailbox = new TaskMailboxImpl();
private StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor taskActionExecutor = StreamTaskActionExecutor.synchronizedExecutor();
@@ -143,7 +139,6 @@ public class MockStreamTaskBuilder {
checkpointStorage,
timerService,
handleAsyncException,
- accumulatorMap,
taskMailbox,
taskActionExecutor);
}