You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:33 UTC
[06/24] flink git commit: [FLINK-2808] [streaming] Refactor and
extend state backend abstraction
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 80563b8..e131cda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordS
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
/**
* Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -125,7 +124,8 @@ public class StreamInputProcessor<IN> {
lastEmittedWatermark = Long.MIN_VALUE;
}
- public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, Object lock) throws Exception {
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
if (isFinished) {
return false;
}
@@ -161,11 +161,8 @@ public class StreamInputProcessor<IN> {
} else {
// now we can do the actual processing
StreamRecord<IN> record = recordOrWatermark.asRecord();
- StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
synchronized (lock) {
- if (ctx != null) {
- ctx.setNextInput(record);
- }
+ streamOperator.setKeyContextElement(record);
streamOperator.processElement(record);
}
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
index 017c8ea..7020758 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
@@ -17,17 +17,16 @@
*/
package org.apache.flink.streaming.runtime.operators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -57,10 +56,9 @@ public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> imple
}
@Override
- @SuppressWarnings("unchecked")
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- buckets = Maps.newHashMap();
+ public void open() throws Exception {
+ super.open();
+ buckets = new HashMap<>();
}
@@ -70,7 +68,7 @@ public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> imple
long bucketId = record.getTimestamp() - (record.getTimestamp() % granularity);
List<StreamRecord<T>> bucket = buckets.get(bucketId);
if (bucket == null) {
- bucket = Lists.newArrayList();
+ bucket = new ArrayList<>();
buckets.put(bucketId, bucket);
}
bucket.add(record);
@@ -79,7 +77,7 @@ public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> imple
@Override
public void processWatermark(Watermark mark) throws Exception {
long maxBucketId = mark.getTimestamp() - (mark.getTimestamp() % granularity);
- Set<Long> toRemove = Sets.newHashSet();
+ Set<Long> toRemove = new HashSet<>();
for (Map.Entry<Long, List<StreamRecord<T>>> bucket: buckets.entrySet()) {
if (bucket.getKey() < maxBucketId) {
Collections.sort(bucket.getValue(), new Comparator<StreamRecord<T>>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index ddfc6a1..6e51a49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -17,9 +17,9 @@
package org.apache.flink.streaming.runtime.operators;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -46,11 +46,11 @@ public class ExtractTimestampsOperator<T>
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+ public void open() throws Exception {
+ super.open();
+ watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) {
- getRuntimeContext().registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+ registerTimer(System.currentTimeMillis() + watermarkInterval, this);
}
currentWatermark = Long.MIN_VALUE;
@@ -78,7 +78,7 @@ public class ExtractTimestampsOperator<T>
@Override
public void trigger(long timestamp) throws Exception {
// register next timer
- getRuntimeContext().registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+ registerTimer(System.currentTimeMillis() + watermarkInterval, this);
long lastWatermark = currentWatermark;
currentWatermark = userFunction.getCurrentWatermark();
@@ -90,6 +90,6 @@ public class ExtractTimestampsOperator<T>
@Override
public void processWatermark(Watermark mark) throws Exception {
- // ingore them, since we are basically a watermark source
+ // ignore them, since we are basically a watermark source
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 4fcfb2c..227de49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -22,7 +22,6 @@ import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -105,8 +104,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
// ------------------------------------------------------------------------
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open() throws Exception {
+ super.open();
out = new TimestampedCollector<>(output);
@@ -119,7 +118,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
nextEvaluationTime = now + windowSlide - (now % windowSlide);
nextSlideTime = now + paneSize - (now % paneSize);
- getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
+ registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
}
@Override
@@ -188,7 +187,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
}
long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
- getRuntimeContext().registerTimer(nextTriggerTime, this);
+ registerTimer(nextTriggerTime, this);
}
private void computeWindow(long timestamp) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index f35ffca..aecfd5d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -18,16 +18,15 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -39,10 +38,13 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -104,11 +106,11 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- windows = Maps.newHashMap();
- watermarkTimers = Maps.newHashMap();
- processingTimeTimers = Maps.newHashMap();
+ public void open() throws Exception {
+ super.open();
+ windows = new HashMap<>();
+ watermarkTimers = new HashMap<>();
+ processingTimeTimers = new HashMap<>();
timestampedCollector = new TimestampedCollector<>(output);
if (inputSerializer == null) {
@@ -116,7 +118,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
}
windowBufferFactory.setRuntimeContext(getRuntimeContext());
- windowBufferFactory.open(parameters);
+ windowBufferFactory.open(getUserFunctionParameters());
}
@Override
@@ -192,7 +194,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
@Override
public void processWatermark(Watermark mark) throws Exception {
- Set<Long> toRemove = Sets.newHashSet();
+ Set<Long> toRemove = new HashSet<>();
for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
if (triggers.getKey() <= mark.getTimestamp()) {
@@ -212,7 +214,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
@Override
public void trigger(long time) throws Exception {
- Set<Long> toRemove = Sets.newHashSet();
+ Set<Long> toRemove = new HashSet<>();
for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
if (triggers.getKey() < time) {
@@ -243,7 +245,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
Set<TriggerContext> triggers = processingTimeTimers.get(time);
if (triggers == null) {
getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
- triggers = Sets.newHashSet();
+ triggers = new HashSet<>();
processingTimeTimers.put(time, triggers);
}
triggers.add(this);
@@ -253,7 +255,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
public void registerWatermarkTimer(long time) {
Set<TriggerContext> triggers = watermarkTimers.get(time);
if (triggers == null) {
- triggers = Sets.newHashSet();
+ triggers = new HashSet<>();
watermarkTimers.put(time, triggers);
}
triggers.add(this);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index da36db1..82a3f9a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,17 +18,16 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -44,6 +43,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -149,11 +150,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- windows = Maps.newHashMap();
- watermarkTimers = Maps.newHashMap();
- processingTimeTimers = Maps.newHashMap();
+ public void open() throws Exception {
+ super.open();
+ windows = new HashMap<>();
+ watermarkTimers = new HashMap<>();
+ processingTimeTimers = new HashMap<>();
timestampedCollector = new TimestampedCollector<>(output);
if (inputSerializer == null) {
@@ -161,7 +162,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
windowBufferFactory.setRuntimeContext(getRuntimeContext());
- windowBufferFactory.open(parameters);
+ windowBufferFactory.open(getUserFunctionParameters());
}
@Override
@@ -191,7 +192,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
if (keyWindows == null) {
- keyWindows = Maps.newHashMap();
+ keyWindows = new HashMap<>();
windows.put(key, keyWindows);
}
@@ -260,7 +261,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
@Override
public void processWatermark(Watermark mark) throws Exception {
- Set<Long> toRemove = Sets.newHashSet();
+ Set<Long> toRemove = new HashSet<>();
for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
if (triggers.getKey() <= mark.getTimestamp()) {
@@ -280,7 +281,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
@Override
public void trigger(long time) throws Exception {
- Set<Long> toRemove = Sets.newHashSet();
+ Set<Long> toRemove = new HashSet<>();
for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
if (triggers.getKey() < time) {
@@ -317,7 +318,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
Set<TriggerContext> triggers = processingTimeTimers.get(time);
if (triggers == null) {
getRuntimeContext().registerTimer(time, WindowOperator.this);
- triggers = Sets.newHashSet();
+ triggers = new HashSet<>();
processingTimeTimers.put(time, triggers);
}
triggers.add(this);
@@ -327,7 +328,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
public void registerWatermarkTimer(long time) {
Set<TriggerContext> triggers = watermarkTimers.get(time);
if (triggers == null) {
- triggers = Sets.newHashSet();
+ triggers = new HashSet<>();
watermarkTimers.put(time, triggers);
}
triggers.add(this);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
new file mode 100644
index 0000000..ec90bff
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A special exception that signifies that the cause exception came from a chained operator.
+ */
+public class ExceptionInChainedOperatorException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ExceptionInChainedOperatorException(Throwable cause) {
+ this("Could not forward element to next operator", cause);
+ }
+
+ public ExceptionInChainedOperatorException(String message, Throwable cause) {
+ super(message, requireNonNull(cause));
+ }
+
+ public Throwable getOriginalCause() {
+ Throwable ex = this;
+ do {
+ ex = ex.getCause();
+ } while (ex instanceof ExceptionInChainedOperatorException);
+ return ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 89eac92..5316ae4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
@@ -32,6 +33,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
@Override
public void init() throws Exception {
+ StreamConfig configuration = getConfiguration();
+
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
@@ -52,10 +55,13 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
@Override
protected void run() throws Exception {
- while (running && inputProcessor.processInput(streamOperator, lock)) {
- if (timerException != null) {
- throw timerException;
- }
+ // cache some references on the stack, to make the code more JIT friendly
+ final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
+ final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
+ final Object lock = getCheckpointLock();
+
+ while (running && inputProcessor.processInput(operator, lock)) {
+ checkTimerException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
new file mode 100644
index 0000000..9df3a5d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.CollectorWrapper;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OperatorChain<OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
+
+ private final StreamOperator<?>[] allOperators;
+
+ private final RecordWriterOutput<?>[] streamOutputs;
+
+ private final Output<StreamRecord<OUT>> chainEntryPoint;
+
+
+ public OperatorChain(StreamTask<OUT, ?> containingTask,
+ StreamOperator<OUT> headOperator,
+ AccumulatorRegistry.Reporter reporter) {
+
+ final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
+ final StreamConfig configuration = containingTask.getConfiguration();
+ final boolean enableTimestamps = containingTask.getExecutionConfig().areTimestampsEnabled();
+
+ // we read the chained configs, and the order of record writer registrations by output name
+ Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
+ chainedConfigs.put(configuration.getVertexID(), configuration);
+
+ // create the final output stream writers
+ // we iterate through all the out edges from this job vertex and create a stream output
+ List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
+ Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
+ this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
+
+ for (int i = 0; i < outEdgesInOrder.size(); i++) {
+ StreamEdge outEdge = outEdgesInOrder.get(i);
+
+ RecordWriterOutput<?> streamOutput = createStreamOutput(
+ outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
+ containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
+
+ streamOutputMap.put(outEdge, streamOutput);
+ this.streamOutputs[i] = streamOutput;
+ }
+
+ // we create the chain of operators and grab the collector that leads into the chain
+ List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
+ this.chainEntryPoint = createOutputCollector(containingTask, configuration,
+ chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
+
+ this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);
+
+ // add the head operator to the end of the list
+ this.allOperators[this.allOperators.length - 1] = headOperator;
+ }
+
+ //
+
+ public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+ for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+ streamOutput.broadcastEvent(barrier);
+ }
+ }
+
+ public RecordWriterOutput<?>[] getStreamOutputs() {
+ return streamOutputs;
+ }
+
+ public StreamOperator<?>[] getAllOperators() {
+ return allOperators;
+ }
+
+ public Output<StreamRecord<OUT>> getChainEntryPoint() {
+ return chainEntryPoint;
+ }
+
+ /**
+ *
+ * This method should be called before finishing the record emission, to make sure any data
+ * that is still buffered will be sent. It also ensures that all data sending related
+ * exceptions are recognized.
+ *
+ * @throws IOException Thrown, if the buffered data cannot be pushed into the output streams.
+ */
+ public void flushOutputs() throws IOException {
+ for (RecordWriterOutput<?> streamOutput : getStreamOutputs()) {
+ streamOutput.flush();
+ }
+ }
+
+ /**
+ * This method releases all resources of the record writer output. It stops the output
+ * flushing thread (if there is one) and releases all buffers currently held by the output
+ * serializers.
+ *
+ * <p>This method should never fail.
+ */
+ public void releaseOutputs() {
+ try {
+ for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+ streamOutput.close();
+ }
+ }
+ finally {
+ // make sure that we release the buffers in any case
+ for (RecordWriterOutput<?> output : streamOutputs) {
+ output.clearBuffers();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // initialization utilities
+ // ------------------------------------------------------------------------
+
+ private static <T> Output<StreamRecord<T>> createOutputCollector(
+ StreamTask<?, ?> containingTask,
+ StreamConfig operatorConfig,
+ Map<Integer, StreamConfig> chainedConfigs,
+ ClassLoader userCodeClassloader,
+ Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
+ List<StreamOperator<?>> allOperators)
+ {
+ // We create a wrapper that will encapsulate the chained operators and network outputs
+ OutputSelectorWrapper<T> outputSelectorWrapper = operatorConfig.getOutputSelectorWrapper(userCodeClassloader);
+ CollectorWrapper<T> wrapper = new CollectorWrapper<T>(outputSelectorWrapper);
+
+ // create collectors for the network outputs
+ for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
+ @SuppressWarnings("unchecked")
+ RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
+ wrapper.addCollector(output, outputEdge);
+ }
+
+ // Create collectors for the chained outputs
+ for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
+ int outputId = outputEdge.getTargetId();
+ StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
+
+ Output<StreamRecord<T>> output = createChainedOperator(
+ containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
+ wrapper.addCollector(output, outputEdge);
+ }
+ return wrapper;
+ }
+
+ private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
+ StreamTask<?, ?> containingTask,
+ StreamConfig operatorConfig,
+ Map<Integer, StreamConfig> chainedConfigs,
+ ClassLoader userCodeClassloader,
+ Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
+ List<StreamOperator<?>> allOperators)
+ {
+ // create the output that the operator writes to first. this may recursively create more operators
+ Output<StreamRecord<OUT>> output = createOutputCollector(
+ containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
+
+ // now create the operator and give it the output collector to write its output to
+ OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
+ chainedOperator.setup(containingTask, operatorConfig, output);
+
+ allOperators.add(chainedOperator);
+
+ if (containingTask.getExecutionConfig().isObjectReuseEnabled() || chainedOperator.isInputCopyingDisabled()) {
+ return new ChainingOutput<IN>(chainedOperator);
+ }
+ else {
+ TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
+ return new CopyingChainingOutput<IN>(chainedOperator, inSerializer);
+ }
+ }
+
+ private static <T> RecordWriterOutput<T> createStreamOutput(
+ StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
+ Environment taskEnvironment, boolean withTimestamps,
+ AccumulatorRegistry.Reporter reporter, String taskName)
+ {
+ TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+
+ @SuppressWarnings("unchecked")
+ StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
+
+ LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
+
+ ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
+
+ StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
+ new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
+ output.setReporter(reporter);
+
+ return new RecordWriterOutput<T>(output, outSerializer, withTimestamps);
+ }
+
+ // ------------------------------------------------------------------------
+ // Collectors for output chaining
+ // ------------------------------------------------------------------------
+
+ private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
+
+ protected final OneInputStreamOperator<T, ?> operator;
+
+ public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
+ this.operator = operator;
+ }
+
+ @Override
+ public void collect(StreamRecord<T> record) {
+ try {
+ operator.setKeyContextElement(record);
+ operator.processElement(record);
+ }
+ catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ try {
+ operator.processWatermark(mark);
+ }
+ catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ operator.close();
+ }
+ catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
+ }
+ }
+
+ private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
+
+ private final TypeSerializer<T> serializer;
+
+ private final StreamRecord<T> copyRecord;
+
+ public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
+ super(operator);
+ this.serializer = serializer;
+ this.copyRecord = new StreamRecord<T>(null, 0L);
+ }
+
+ @Override
+ public void collect(StreamRecord<T> record) {
+ try {
+ T copy = serializer.copy(record.getValue());
+ copyRecord.replace(copy, record.getTimestamp());
+
+ operator.setKeyContextElement(copyRecord);
+ operator.processElement(copyRecord);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Could not forward element to next operator", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
deleted file mode 100644
index ce659fc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.CollectorWrapper;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutputHandler<OUT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
-
- private final StreamTask<OUT, ?> vertex;
-
- /** The classloader used to access all user code */
- private final ClassLoader userCodeClassloader;
-
-
- private final Output<StreamRecord<OUT>> outerOutput;
-
- public final List<StreamOperator<?>> chainedOperators;
-
- private final Map<StreamEdge, RecordWriterOutput<?>> outputMap;
-
- private final Map<Integer, StreamConfig> chainedConfigs;
-
- /** Counters for the number of records emitted and bytes written. */
- protected final AccumulatorRegistry.Reporter reporter;
-
-
- public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap,
- AccumulatorRegistry.Reporter reporter) {
-
- // Initialize some fields
- this.vertex = vertex;
- StreamConfig configuration = new StreamConfig(vertex.getTaskConfiguration());
- this.chainedOperators = new ArrayList<StreamOperator<?>>();
- this.outputMap = new HashMap<StreamEdge, RecordWriterOutput<?>>();
- this.userCodeClassloader = vertex.getUserCodeClassLoader();
-
- // We read the chained configs, and the order of record writer
- // registrations by output name
- this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
- this.chainedConfigs.put(configuration.getVertexID(), configuration);
-
- List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
-
- this.reporter = reporter;
-
- // We iterate through all the out edges from this job vertex and create
- // a stream output
- for (StreamEdge outEdge : outEdgesInOrder) {
- RecordWriterOutput<?> streamOutput = createStreamOutput(
- outEdge,
- outEdge.getTargetId(),
- chainedConfigs.get(outEdge.getSourceId()),
- outEdgesInOrder.indexOf(outEdge),
- reporter);
- outputMap.put(outEdge, streamOutput);
- }
-
- // We create the outer output that will be passed to the first task
- // in the chain
- this.outerOutput = createChainedCollector(configuration, accumulatorMap);
-
- // Add the head operator to the end of the list
- this.chainedOperators.add(vertex.streamOperator);
- }
-
- public void broadcastBarrier(long id, long timestamp) throws IOException, InterruptedException {
- CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
- for (RecordWriterOutput<?> streamOutput : outputMap.values()) {
- streamOutput.broadcastEvent(barrier);
- }
- }
-
- public Collection<RecordWriterOutput<?>> getOutputs() {
- return outputMap.values();
- }
-
- public List<StreamOperator<?>> getChainedOperators(){
- return chainedOperators;
- }
-
- /**
- * This method builds up a nested output which encapsulates all the
- * chained operators and their network output. The result of this recursive
- * call will be passed as output to the first operator in the chain.
- *
- * @param chainedTaskConfig
- * The configuration of the starting operator of the chain, we
- * use this paramater to recursively build the whole chain
- * @return Returns the output for the chain starting from the given
- * config
- */
- @SuppressWarnings("unchecked")
- private <X> Output<StreamRecord<X>> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
-
- // We create a wrapper that will encapsulate the chained operators and
- // network outputs
-
- OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(userCodeClassloader);
- CollectorWrapper wrapper = new CollectorWrapper(outputSelectorWrapper);
-
- // Create collectors for the network outputs
- for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(userCodeClassloader)) {
- Output<?> output = outputMap.get(outputEdge);
-
- wrapper.addCollector(output, outputEdge);
- }
-
- // Create collectors for the chained outputs
- for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(userCodeClassloader)) {
- Integer outputId = outputEdge.getTargetId();
-
- Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);
-
- wrapper.addCollector(output, outputEdge);
- }
-
- if (chainedTaskConfig.isChainStart()) {
- // The current task is the first chained task at this vertex so we
- // return the wrapper
- return (Output<StreamRecord<X>>) wrapper;
- }
- else {
- // The current task is a part of the chain so we get the chainable
- // operator which will be returned and set it up using the wrapper
- OneInputStreamOperator chainableOperator =
- chainedTaskConfig.getStreamOperator(userCodeClassloader);
-
- StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
- vertex.contexts.add(chainedContext);
-
- chainableOperator.setup(wrapper, chainedContext);
-
- chainedOperators.add(chainableOperator);
- if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
- return new ChainingOutput<X>(chainableOperator);
- }
- else {
- TypeSerializer<X> typeSer = chainedTaskConfig.getTypeSerializerIn1(userCodeClassloader);
- TypeSerializer<StreamRecord<X>> inSerializer;
-
- if (vertex.getExecutionConfig().areTimestampsEnabled()) {
- inSerializer = (TypeSerializer<StreamRecord<X>>)
- (TypeSerializer<?>) new MultiplexingStreamRecordSerializer<X>(typeSer);
- }
- else {
- inSerializer = new StreamRecordSerializer<X>(typeSer);
- }
-
- return new CopyingChainingOutput<X>(chainableOperator, inSerializer);
- }
- }
-
- }
-
- public Output<StreamRecord<OUT>> getOutput() {
- return outerOutput;
- }
-
- /**
- * We create the StreamOutput for the specific output given by the id, and
- * the configuration of its source task
- *
- * @param outputVertex
- * Name of the output to which the stream output will be set up
- * @param upStreamConfig
- * The config of upStream task
- * @return The created StreamOutput
- */
- private <T> RecordWriterOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
- StreamConfig upStreamConfig, int outputIndex, AccumulatorRegistry.Reporter reporter) {
-
- TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut1(vertex.userClassLoader);
-
- @SuppressWarnings("unchecked")
- StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
-
- ResultPartitionWriter bufferWriter = vertex.getEnvironment().getWriter(outputIndex);
-
- StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
- new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
-
- output.setReporter(reporter);
-
- RecordWriterOutput<T> streamOutput =
- new RecordWriterOutput<T>(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
- .getSimpleName(), outputIndex, vertex.getClass().getSimpleName());
- }
-
- return streamOutput;
- }
-
- /**
- *
- * This method should be called before finishing the record emission, to make sure any data
- * that is still buffered will be sent. It also ensures that all data sending related
- * exceptions are recognized.
- *
- * @throws IOException Thrown, if the buffered data cannot be pushed into the output streams.
- */
- public void flushOutputs() throws IOException {
- for (RecordWriterOutput<?> streamOutput : getOutputs()) {
- streamOutput.flush();
- }
- }
-
- /**
- * This method releases all resources of the record writer output. It stops the output
- * flushing thread (if there is one) and releases all buffers currently held by the output
- * serializers.
- *
- * This method should never fail.
- */
- public void releaseOutputs() {
- try {
- for (RecordWriterOutput<?> streamOutput : getOutputs()) {
- streamOutput.close();
- }
- }
- finally {
- // make sure that we release the buffers in any case
- for (RecordWriterOutput<?> output : getOutputs()) {
- output.clearBuffers();
- }
- }
- }
-
- private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
-
- protected final OneInputStreamOperator<T, ?> operator;
-
- public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
- this.operator = operator;
- }
-
- @Override
- public void collect(StreamRecord<T> record) {
- try {
- operator.getRuntimeContext().setNextInput(record);
- operator.processElement(record);
- }
- catch (Exception e) {
- throw new RuntimeException("Could not forward element to next operator", e);
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- try {
- operator.processWatermark(mark);
- }
- catch (Exception e) {
- throw new RuntimeException("Could not forward watermark to next operator", e);
- }
- }
-
- @Override
- public void close() {
- try {
- operator.close();
- }
- catch (Exception e) {
- throw new RuntimeException("Could not close() call to next operator", e);
- }
- }
- }
-
- private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
- private final TypeSerializer<StreamRecord<T>> serializer;
-
- public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator,
- TypeSerializer<StreamRecord<T>> serializer) {
- super(operator);
- this.serializer = serializer;
- }
-
- @Override
- public void collect(StreamRecord<T> record) {
- try {
- operator.getRuntimeContext().setNextInput(record);
- operator.processElement(serializer.copy(record));
- }
- catch (Exception e) {
- throw new RuntimeException("Could not forward element to next operator", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index fc221f8..3d82275 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -51,15 +51,13 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
@Override
protected void run() throws Exception {
final Object checkpointLock = getCheckpointLock();
-
- final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<>(outputHandler.getOutput(), checkpointLock);
-
- streamOperator.run(checkpointLock, output);
+ final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<>(getHeadOutput(), checkpointLock);
+ headOperator.run(checkpointLock, output);
}
@Override
protected void cancelTask() throws Exception {
- streamOperator.cancel();
+ headOperator.cancel();
}
// ------------------------------------------------------------------------
@@ -95,9 +93,7 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
@Override
public void collect(T record) {
synchronized (lockObject) {
- if (timerException != null) {
- throw timerException;
- }
+ checkTimerException();
output.collect(record);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index c937e51..2125df1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.runtime.tasks;
-import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -41,7 +40,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
@Override
protected void run() throws Exception {
- final String iterationId = configuration.getIterationId();
+ final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task configuration");
}
@@ -49,7 +48,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
getEnvironment().getIndexInSubtaskGroup());
- final long iterationWaitTime = configuration.getIterationWaitTime();
+ final long iterationWaitTime = getConfiguration().getIterationWaitTime();
final boolean shouldWait = iterationWaitTime > 0;
final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);
@@ -61,8 +60,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
// do the work
try {
@SuppressWarnings("unchecked")
- Collection<RecordWriterOutput<OUT>> outputs =
- (Collection<RecordWriterOutput<OUT>>) (Collection<?>) outputHandler.getOutputs();
+ RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
// If timestamps are enabled we make sure to remove cyclic watermark dependencies
if (getExecutionConfig().areTimestampsEnabled()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index fdce52d..9bb5311 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -36,7 +36,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
public void init() throws Exception {
super.init();
- final String iterationId = configuration.getIterationId();
+ final String iterationId = getConfiguration().getIterationId();
if (iterationId == null || iterationId.length() == 0) {
throw new Exception("Missing iteration ID in the task configuration");
}
@@ -44,7 +44,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
getEnvironment().getIndexInSubtaskGroup());
- final long iterationWaitTime = configuration.getIterationWaitTime();
+ final long iterationWaitTime = getConfiguration().getIterationWaitTime();
LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
@@ -54,7 +54,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
- this.streamOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
+ this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
}
private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 16b8f55..bbfd233 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -17,121 +17,125 @@
package org.apache.flink.streaming.runtime.tasks;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.functors.NotNullPredicate;
-
import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.state.OperatorStateHandle;
-import org.apache.flink.streaming.api.state.WrapperStateHandle;
-
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.StateBackendFactory;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * Base Invokable for all {@code StreamTasks}. A {@code StreamTask} processes input and forwards
- * elements and watermarks to a {@link StreamOperator}.
+ * Base class for all streaming tasks. A task is the unit of local processing that is deployed
+ * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
+ * the Task's operator chain. Operators that are chained together execute synchronously in the
+ * same thread and hence on the same stream partition. A common case for these chaines
+ * are successive map/flatmap/filter tasks.
+ *
+ * <p>The task chain contains one "head" operator and multiple chained operators.
+ * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
+ * as well as for sources, iteration heads and iteration tails.
+ *
+ * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
+ * produced by the operators at the ends of the operator chain. Note that the chain may fork and
+ * thus have multiple ends.
*
+ * The life cycle of the task is set up as follows:
* <pre>
- *
* -- registerInputOutput()
* |
- * +----> Create basic utils (config, etc) and load operators
- * +----> operator specific init()
+ * +----> Create basic utils (config, etc) and load the chain of operators
+ * +----> operators.setup()
+ * +----> task specific init()
*
- * -- restoreState()
+ * -- restoreState() -> restores state of all operators in the chain
*
* -- invoke()
* |
- * +----> open operators()
+ * +----> open-operators()
* +----> run()
- * +----> close operators()
+ * +----> close-operators()
+ * +----> dispose-operators()
* +----> common cleanup
- * +----> operator specific cleanup()
+ * +----> task specific cleanup()
* </pre>
*
- * <p>
- * {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
+ * <p> The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
* {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
* are called concurrently.
*
* @param <OUT>
- * @param <O>
+ * @param <Operator>
*/
-public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
+public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
+ extends AbstractInvokable
+ implements StatefulTask<StreamTaskStateList> {
/** The thread group that holds all trigger timer threads */
public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
- private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
-
+ /** The logger used by the StreamTask and its subclasses */
+ protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
+
+ // ------------------------------------------------------------------------
+
/**
* All interaction with the {@code StreamOperator} must be synchronized on this lock object to ensure that
- * we don't have concurrent method calls.
+ * we don't have concurrent method calls that void consistent checkpoints.
*/
- protected final Object lock = new Object();
-
- private final EventListener<CheckpointBarrier> checkpointBarrierListener;
+ private final Object lock = new Object();
- protected final List<StreamingRuntimeContext> contexts;
+ /** the head operator that consumes the input streams of this task */
+ protected Operator headOperator;
- protected StreamingRuntimeContext headContext;
+ /** The chain of operators executed by this task */
+ private OperatorChain<OUT> operatorChain;
- protected StreamConfig configuration;
-
- protected ClassLoader userClassLoader;
+ /** The configuration of this streaming task */
+ private StreamConfig configuration;
- /** The executor service that */
+ /** The class loader used to load dynamic classes of a job */
+ private ClassLoader userClassLoader;
+
+ /** The state backend that stores the state and checkpoints for this task */
+ private StateBackend<?> stateBackend;
+
+ /** The executor service that schedules and calls the triggers of this task*/
private ScheduledExecutorService timerService;
+
+ /** The map of user-defined accumulators of this task */
+ private Map<String, Accumulator<?, ?>> accumulatorMap;
- /**
- * This field is used to forward an exception that is caught in the timer thread. Subclasses
- * must ensure that exceptions stored here get thrown on the actual execution Thread.
- */
- protected volatile TimerException timerException = null;
-
- protected OutputHandler<OUT> outputHandler;
-
- protected O streamOperator;
-
- protected boolean hasChainedOperators;
-
+ /** This field is used to forward an exception that is caught in the timer thread. Subclasses
+ * must ensure that exceptions stored here get thrown on the actual execution Thread. */
+ private volatile TimerException timerException;
+
/** Flag to mark the task "in operation", in which case check
* needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
private volatile boolean isRunning;
- // ------------------------------------------------------------------------
-
- public StreamTask() {
- checkpointBarrierListener = new CheckpointBarrierListener();
- contexts = new ArrayList<>();
- }
// ------------------------------------------------------------------------
// Life cycle methods for specific implementations
@@ -152,34 +156,27 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
@Override
public final void registerInputOutput() throws Exception {
LOG.debug("Begin initialization for {}", getName());
+
+ AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
userClassLoader = getUserCodeClassLoader();
configuration = new StreamConfig(getTaskConfiguration());
-
- streamOperator = configuration.getStreamOperator(userClassLoader);
-
- // Create and register Accumulators
- AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
- Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap();
- AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
-
- outputHandler = new OutputHandler<>(this, accumulatorMap, reporter);
-
- if (streamOperator != null) {
- // IterationHead and IterationTail don't have an Operator...
-
- //Create context of the head operator
- headContext = createRuntimeContext(configuration, accumulatorMap);
- this.contexts.add(headContext);
- streamOperator.setup(outputHandler.getOutput(), headContext);
+ accumulatorMap = accumulatorRegistry.getUserMap();
+
+ stateBackend = createStateBackend();
+ stateBackend.initializeForJob(getEnvironment().getJobID());
+
+ headOperator = configuration.getStreamOperator(userClassLoader);
+ operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
+
+ if (headOperator != null) {
+ headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
}
- hasChainedOperators = outputHandler.getChainedOperators().size() != 1;
-
- this.timerService = Executors.newSingleThreadScheduledExecutor(
+ timerService = Executors.newSingleThreadScheduledExecutor(
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
- // operator specific initialization
+ // task specific initialization
init();
LOG.debug("Finish initialization for {}", getName());
@@ -211,7 +208,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
closeAllOperators();
// make sure all data is flushed
- outputHandler.flushOutputs();
+ operatorChain.flushOutputs();
// make an attempt to dispose the operators such that failures in the dispose call
// still let the computation fail
@@ -224,8 +221,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
timerService.shutdown();
// release the output resources. this method should never fail.
- if (outputHandler != null) {
- outputHandler.releaseOutputs();
+ if (operatorChain != null) {
+ operatorChain.releaseOutputs();
}
// we must! perform this cleanup
@@ -252,41 +249,33 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
}
private void openAllOperators() throws Exception {
- for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
- if (operator != null) {
- operator.open(getTaskConfiguration());
- }
+ for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+ operator.open();
}
}
private void closeAllOperators() throws Exception {
// We need to close them first to last, since upstream operators in the chain might emit
// elements in their close methods.
- for (int i = outputHandler.getChainedOperators().size() - 1; i >= 0; i--) {
- StreamOperator<?> operator = outputHandler.getChainedOperators().get(i);
- if (operator != null) {
- operator.close();
- }
+ StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+ for (int i = allOperators.length - 1; i >= 0; i--) {
+ allOperators[i].close();
}
}
private void tryDisposeAllOperators() throws Exception {
- for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
- if (operator != null) {
- operator.dispose();
- }
+ for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+ operator.dispose();
}
}
private void disposeAllOperators() {
- for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
- if (operator != null) {
- try {
- operator.dispose();
- }
- catch (Throwable t) {
- LOG.error("Error during disposal of stream operator.", t);
- }
+ for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+ try {
+ operator.dispose();
+ }
+ catch (Throwable t) {
+ LOG.error("Error during disposal of stream operator.", t);
}
}
}
@@ -300,8 +289,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
* shutdown is attempted, and cause threads to linger for longer than needed.
*/
@Override
- @SuppressWarnings("FinalizeDoesntCallSuperFinalize")
- protected void finalize() {
+ protected void finalize() throws Throwable {
+ super.finalize();
if (timerService != null) {
if (!timerService.isTerminated()) {
LOG.warn("Timer service was not shut down. Shutting down in finalize().");
@@ -322,73 +311,84 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
return getEnvironment().getTaskNameWithSubtasks();
}
+ /**
+ * Gets the lock object on which all operations that involve data and state mutation have to lock.
+
+ * @return The checkpoint lock object.
+ */
public Object getCheckpointLock() {
return lock;
}
+
+ public StreamConfig getConfiguration() {
+ return configuration;
+ }
+
+ public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
+ return accumulatorMap;
+ }
+
+ public Output<StreamRecord<OUT>> getHeadOutput() {
+ return operatorChain.getChainEntryPoint();
+ }
+
+ public RecordWriterOutput<?>[] getStreamOutputs() {
+ return operatorChain.getStreamOutputs();
+ }
// ------------------------------------------------------------------------
// Checkpoint and Restore
// ------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
+
@Override
- public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
-
- // We retrieve end restore the states for the chained operators.
- List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates =
- (List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) stateHandle.getState(this.userClassLoader);
-
- // We restore all stateful operators
- for (int i = 0; i < chainedStates.size(); i++) {
- Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> state = chainedStates.get(i);
- // If state is not null we need to restore it
- if (state != null) {
- StreamOperator<?> chainedOperator = outputHandler.getChainedOperators().get(i);
- ((StatefulStreamOperator<?>) chainedOperator).restoreInitialState(state);
+ public void setInitialState(StreamTaskStateList initialState) throws Exception {
+ LOG.info("Restoring checkpointed state to task {}", getName());
+
+ final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+ final StreamTaskState[] states = initialState.getState(userClassLoader);
+
+ for (int i = 0; i < states.length; i++) {
+ StreamTaskState state = states[i];
+ StreamOperator<?> operator = allOperators[i];
+
+ if (state != null && operator != null) {
+ LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
+ operator.restoreState(state);
+ }
+ else if (operator != null) {
+ LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
}
}
}
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
-
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
synchronized (lock) {
if (isRunning) {
- try {
- // We wrap the states of the chained operators in a list, marking non-stateful operators with null
- List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<>();
- // A wrapper handle is created for the List of statehandles
- WrapperStateHandle stateHandle;
- try {
-
- // We construct a list of states for chained tasks
- for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
- if (chainedOperator instanceof StatefulStreamOperator) {
- chainedStates.add(((StatefulStreamOperator<?>) chainedOperator)
- .getStateSnapshotFromFunction(checkpointId, timestamp));
- }else{
- chainedStates.add(null);
- }
- }
-
- stateHandle = CollectionUtils.exists(chainedStates,
- NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null;
- }
- catch (Exception e) {
- throw new Exception("Error while drawing snapshot of the user state.", e);
+ // since both state checkpointing and downstream barrier emission occurs in this
+ // lock scope, they are an atomic operation regardless of the order in which they occur
+ // we immediately emit the checkpoint barriers, so the downstream operators can start
+ // their checkpoint work as soon as possible
+ operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
+
+ // now draw the state snapshot
+ try {
+ final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+ final StreamTaskState[] states = new StreamTaskState[allOperators.length];
+
+ for (int i = 0; i < states.length; i++) {
+ StreamTaskState state = allOperators[i].snapshotOperatorState(checkpointId, timestamp);
+ states[i] = state.isEmpty() ? null : state;
}
- // now emit the checkpoint barriers
- outputHandler.broadcastBarrier(checkpointId, timestamp);
-
- // now confirm the checkpoint
- if (stateHandle == null) {
+ StreamTaskStateList allStates = new StreamTaskStateList(states);
+ if (allStates.isEmpty()) {
getEnvironment().acknowledgeCheckpoint(checkpointId);
} else {
- getEnvironment().acknowledgeCheckpoint(checkpointId, stateHandle);
+ getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
}
}
catch (Exception e) {
@@ -404,64 +404,85 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
public void notifyCheckpointComplete(long checkpointId) throws Exception {
synchronized (lock) {
if (isRunning) {
- for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
- if (chainedOperator instanceof StatefulStreamOperator) {
- ((StatefulStreamOperator<?>) chainedOperator).notifyCheckpointComplete(checkpointId);
- }
+ LOG.debug("Notification of complete checkpoint for task {}", getName());
+
+ for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+ operator.notifyOfCompletedCheckpoint(checkpointId);
}
}
+ else {
+ LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
+ }
}
}
// ------------------------------------------------------------------------
// State backend
// ------------------------------------------------------------------------
-
- private StateHandleProvider<Serializable> getStateHandleProvider() {
- StateHandleProvider<Serializable> provider = configuration.getStateHandleProvider(userClassLoader);
- // If the user did not specify a provider in the program we try to get it from the config
- if (provider == null) {
+ /**
+ * Gets the state backend used by this task. The state backend defines how to maintain the
+ * key/value state and how and where to store state snapshots.
+ *
+ * @return The state backend used by this task.
+ */
+ public StateBackend<?> getStateBackend() {
+ return stateBackend;
+ }
+
+ private StateBackend<?> createStateBackend() throws Exception {
+ StateBackend<?> configuredBackend = configuration.getStateBackend(userClassLoader);
+
+ if (configuredBackend != null) {
+ // backend has been configured on the environment
+ LOG.info("Using user-defined state backend: " + configuredBackend);
+ return configuredBackend;
+ }
+ else {
+ // see if we have a backend specified in the configuration
Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
- String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND,
- ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();
-
- StateBackend backend;
-
- try {
- backend = StateBackend.valueOf(backendName);
- } catch (Exception e) {
- throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem.");
+ String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
+
+ if (backendName == null) {
+ LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
+ backendName = "jobmanager";
}
+
+ backendName = backendName.toLowerCase();
+ switch (backendName) {
+ case "jobmanager":
+ LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
+ return MemoryStateBackend.defaultInstance();
+
+ case "filesystem":
+ FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
+ LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
+ + backend.getBasePath() + "\")");
+ return backend;
+
+ default:
+ try {
+ @SuppressWarnings("rawtypes")
+ Class<? extends StateBackendFactory> clazz =
+ Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
- switch (backend) {
- case JOBMANAGER:
- LOG.info("State backend for state checkpoints is set to jobmanager.");
- return new LocalStateHandle.LocalStateHandleProvider<>();
- case FILESYSTEM:
- String checkpointDir = flinkConfig.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
- if (checkpointDir != null) {
- LOG.info("State backend for state checkpoints is set to filesystem with directory: "
- + checkpointDir);
- return FileStateHandle.createProvider(checkpointDir);
- } else {
- throw new RuntimeException(
- "For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\"");
+ return (StateBackend<?>) clazz.newInstance();
+ }
+ catch (ClassNotFoundException e) {
+ throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
+ }
+ catch (ClassCastException e) {
+ throw new IllegalConfigurationException("The class configured under '" +
+ ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+ backendName + ')');
+ }
+ catch (Throwable t) {
+ throw new IllegalConfigurationException("Cannot create configured state backend", t);
}
- default:
- throw new RuntimeException("Backend " + backend + " is not supported yet.");
}
-
- } else {
- LOG.info("Using user defined state backend for streaming checkpoitns.");
- return provider;
}
}
- private enum StateBackend {
- JOBMANAGER, FILESYSTEM
- }
-
/**
* Registers a timer.
*/
@@ -473,41 +494,37 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
delay,
TimeUnit.MILLISECONDS);
}
+
+ public void checkTimerException() throws TimerException {
+ if (timerException != null) {
+ throw timerException;
+ }
+ }
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
-
- public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map<String, Accumulator<?,?>> accumulatorMap) {
- KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
-
- return new StreamingRuntimeContext(getEnvironment(), getExecutionConfig(),
- statePartitioner, getStateHandleProvider(), accumulatorMap, this);
- }
@Override
public String toString() {
return getName();
}
- // ------------------------------------------------------------------------
-
- public EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
- return this.checkpointBarrierListener;
- }
-
- private class CheckpointBarrierListener implements EventListener<CheckpointBarrier> {
-
- @Override
- public void onEvent(CheckpointBarrier barrier) {
- try {
- triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
- }
- catch (Exception e) {
- throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+ protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
+ return new EventListener<CheckpointBarrier>() {
+ @Override
+ public void onEvent(CheckpointBarrier barrier) {
+ try {
+ triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+ }
}
- }
+ };
}
+
+ // ------------------------------------------------------------------------
/**
* Internal task that is invoked by the timer service and triggers the target.
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
new file mode 100644
index 0000000..2fce7af
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.KvStateSnapshot;
+
+import java.io.Serializable;
+
+/**
+ * The state checkpointed by a {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}.
+ * This state consists of any combination of those three:
+ * <ul>
+ * <li>The state of the stream operator, if it implements the Checkpointed interface.</li>
+ * <li>The state of the user function, if it implements the Checkpointed interface.</li>
+ * <li>The key/value state of the operator, if it executes on a KeyedDataStream.</li>
+ * </ul>
+ */
+public class StreamTaskState implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private StateHandle<?> operatorState;
+
+ private StateHandle<Serializable> functionState;
+
+ private KvStateSnapshot<?, ?, ?> kvState;
+
+ // ------------------------------------------------------------------------
+
+ public StateHandle<?> getOperatorState() {
+ return operatorState;
+ }
+
+ public void setOperatorState(StateHandle<?> operatorState) {
+ this.operatorState = operatorState;
+ }
+
+ public StateHandle<Serializable> getFunctionState() {
+ return functionState;
+ }
+
+ public void setFunctionState(StateHandle<Serializable> functionState) {
+ this.functionState = functionState;
+ }
+
+ public KvStateSnapshot<?, ?, ?> getKvState() {
+ return kvState;
+ }
+
+ public void setKvState(KvStateSnapshot<?, ?, ?> kvState) {
+ this.kvState = kvState;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Checks if this state object actually contains any state, or if all of the state
+ * fields are null.
+ *
+ * @return True, if all state is null, false if at least one state is not null.
+ */
+ public boolean isEmpty() {
+ return operatorState == null & functionState == null & kvState == null;
+ }
+
+ /**
+ * Discards all the contained states and sets them to null.
+ *
+ * @throws Exception Forwards exceptions that occur when releasing the
+ * state handles and snapshots.
+ */
+ public void discardState() throws Exception {
+ StateHandle<?> operatorState = this.operatorState;
+ StateHandle<?> functionState = this.functionState;
+ KvStateSnapshot<?, ?, ?> kvState = this.kvState;
+
+ if (operatorState != null) {
+ operatorState.discardState();
+ }
+ if (functionState != null) {
+ functionState.discardState();
+ }
+ if (kvState != null) {
+ kvState.discardState();
+ }
+
+ this.operatorState = null;
+ this.functionState = null;
+ this.kvState = null;
+ }
+}