You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:46 UTC
[30/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
deleted file mode 100644
index 80df72e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-/**
- * An element in a data stream. Can be a record or a Watermark.
- */
-public abstract class StreamElement {
-
- /**
- * Checks whether this element is a watermark.
- * @return True, if this element is a watermark, false otherwise.
- */
- public final boolean isWatermark() {
- return getClass() == Watermark.class;
- }
-
- /**
- * Checks whether this element is a record.
- * @return True, if this element is a record, false otherwise.
- */
- public final boolean isRecord() {
- return getClass() == StreamRecord.class;
- }
-
- /**
- * Casts this element into a StreamRecord.
- * @return This element as a stream record.
- * @throws java.lang.ClassCastException Thrown, if this element is actually not a stream record.
- */
- @SuppressWarnings("unchecked")
- public final <E> StreamRecord<E> asRecord() {
- return (StreamRecord<E>) this;
- }
-
- /**
- * Casts this element into a Watermark.
- * @return This element as a Watermark.
- * @throws java.lang.ClassCastException Thrown, if this element is actually not a Watermark.
- */
- public final Watermark asWatermark() {
- return (Watermark) this;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
deleted file mode 100644
index 348b974..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-/**
- * One value in a data stream. This stores the value and the associated timestamp.
- *
- * @param <T> The type encapsulated with the stream record.
- */
-public class StreamRecord<T> extends StreamElement {
-
- /** The actual value held by this record */
- private T value;
-
- /** The timestamp of the record */
- private long timestamp;
-
- /**
- * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
- * result of {@code new Instant(0)}.
- */
- public StreamRecord(T value) {
- this(value, Long.MIN_VALUE + 1);
- // be careful to set it to MIN_VALUE + 1, because MIN_VALUE is reserved as the
- // special tag to signify that a transmitted element is a Watermark in StreamRecordSerializer
- }
-
- /**
- * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
- * given timestamp.
- *
- * @param value The value to wrap in this {@link StreamRecord}
- * @param timestamp The timestamp in milliseconds
- */
- public StreamRecord(T value, long timestamp) {
- this.value = value;
- this.timestamp = timestamp;
- }
-
- /**
- * Returns the value wrapped in this stream value.
- */
- public T getValue() {
- return value;
- }
-
- /**
- * Returns the timestamp associated with this stream value in milliseconds.
- */
- public long getTimestamp() {
- return timestamp;
- }
-
- /**
- * Replace the currently stored value by the given new value. This returns a StreamElement
- * with the generic type parameter that matches the new value while keeping the old
- * timestamp.
- *
- * @param element Element to set in this stream value
- * @return Returns the StreamElement with replaced value
- */
- @SuppressWarnings("unchecked")
- public <X> StreamRecord<X> replace(X element) {
- this.value = (T) element;
- return (StreamRecord<X>) this;
- }
-
- /**
- * Replace the currently stored value by the given new value and the currently stored
- * timestamp with the new timestamp. This returns a StreamElement with the generic type
- * parameter that matches the new value.
- *
- * @param value The new value to wrap in this {@link StreamRecord}
- * @param timestamp The new timestamp in milliseconds
- * @return Returns the StreamElement with replaced value
- */
- @SuppressWarnings("unchecked")
- public <X> StreamRecord<X> replace(X value, long timestamp) {
- this.timestamp = timestamp;
- this.value = (T) value;
- return (StreamRecord<X>) this;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- StreamRecord<?> that = (StreamRecord<?>) o;
-
- return value.equals(that.value) && timestamp == that.timestamp;
- }
-
- @Override
- public int hashCode() {
- int result = value != null ? value.hashCode() : 0;
- result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "Record{" + value + "; " + timestamp + '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
deleted file mode 100644
index d47da50..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with
- * the element.
- *
- * <p>
- * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also
- * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same
- * stream with {@link StreamRecord StreamRecords}.
- *
- * @see MultiplexingStreamRecordSerializer
- *
- * @param <T> The type of value in the {@link StreamRecord}
- */
-public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
-
- private static final long serialVersionUID = 1L;
-
- private final TypeSerializer<T> typeSerializer;
-
-
- public StreamRecordSerializer(TypeSerializer<T> serializer) {
- if (serializer instanceof StreamRecordSerializer) {
- throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
- }
- this.typeSerializer = Preconditions.checkNotNull(serializer);
- }
-
- public TypeSerializer<T> getContainedTypeSerializer() {
- return this.typeSerializer;
- }
-
- // ------------------------------------------------------------------------
- // General serializer and type utils
- // ------------------------------------------------------------------------
-
- @Override
- public StreamRecordSerializer<T> duplicate() {
- TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
- return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public int getLength() {
- return typeSerializer.getLength();
- }
-
- // ------------------------------------------------------------------------
- // Type serialization, copying, instantiation
- // ------------------------------------------------------------------------
-
- @Override
- public StreamRecord<T> createInstance() {
- try {
- return new StreamRecord<T>(typeSerializer.createInstance());
- } catch (Exception e) {
- throw new RuntimeException("Cannot instantiate StreamRecord.", e);
- }
- }
-
- @Override
- public StreamRecord<T> copy(StreamRecord<T> from) {
- return new StreamRecord<T>(typeSerializer.copy(from.getValue()), from.getTimestamp());
- }
-
- @Override
- public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
- reuse.replace(typeSerializer.copy(from.getValue(), reuse.getValue()), 0);
- return reuse;
- }
-
- @Override
- public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
- typeSerializer.serialize(value.getValue(), target);
- }
-
- @Override
- public StreamRecord<T> deserialize(DataInputView source) throws IOException {
- T element = typeSerializer.deserialize(source);
- return new StreamRecord<T>(element, 0);
- }
-
- @Override
- public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
- T element = typeSerializer.deserialize(reuse.getValue(), source);
- reuse.replace(element, 0);
- return reuse;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- typeSerializer.copy(source, target);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof StreamRecordSerializer) {
- StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj;
-
- return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof StreamRecordSerializer;
- }
-
- @Override
- public int hashCode() {
- return typeSerializer.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
deleted file mode 100644
index ec90bff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A special exception that signifies that the cause exception came from a chained operator.
- */
-public class ExceptionInChainedOperatorException extends RuntimeException {
-
- private static final long serialVersionUID = 1L;
-
- public ExceptionInChainedOperatorException(Throwable cause) {
- this("Could not forward element to next operator", cause);
- }
-
- public ExceptionInChainedOperatorException(String message, Throwable cause) {
- super(message, requireNonNull(cause));
- }
-
- public Throwable getOriginalCause() {
- Throwable ex = this;
- do {
- ex = ex.getCause();
- } while (ex instanceof ExceptionInChainedOperatorException);
- return ex;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
deleted file mode 100644
index 5316ae4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
-
-public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
-
- private StreamInputProcessor<IN> inputProcessor;
-
- private volatile boolean running = true;
-
- @Override
- public void init() throws Exception {
- StreamConfig configuration = getConfiguration();
-
- TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
- int numberOfInputs = configuration.getNumberOfInputs();
-
- if (numberOfInputs > 0) {
- InputGate[] inputGates = getEnvironment().getAllInputGates();
- inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
- getCheckpointBarrierListener(),
- configuration.getCheckpointMode(),
- getEnvironment().getIOManager(),
- getExecutionConfig().areTimestampsEnabled());
-
- // make sure that stream tasks report their I/O statistics
- AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
- inputProcessor.setReporter(reporter);
- }
- }
-
- @Override
- protected void run() throws Exception {
- // cache some references on the stack, to make the code more JIT friendly
- final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
- final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
- final Object lock = getCheckpointLock();
-
- while (running && inputProcessor.processInput(operator, lock)) {
- checkTimerException();
- }
- }
-
- @Override
- protected void cleanup() throws Exception {
- inputProcessor.cleanup();
- }
-
- @Override
- protected void cancelTask() {
- running = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
deleted file mode 100644
index b42b888..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.CollectorWrapper;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OperatorChain<OUT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
-
- private final StreamOperator<?>[] allOperators;
-
- private final RecordWriterOutput<?>[] streamOutputs;
-
- private final Output<StreamRecord<OUT>> chainEntryPoint;
-
-
- public OperatorChain(StreamTask<OUT, ?> containingTask,
- StreamOperator<OUT> headOperator,
- AccumulatorRegistry.Reporter reporter) {
-
- final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
- final StreamConfig configuration = containingTask.getConfiguration();
- final boolean enableTimestamps = containingTask.getExecutionConfig().areTimestampsEnabled();
-
- // we read the chained configs, and the order of record writer registrations by output name
- Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
- chainedConfigs.put(configuration.getVertexID(), configuration);
-
- // create the final output stream writers
- // we iterate through all the out edges from this job vertex and create a stream output
- List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
- Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
- this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
-
- // from here on, we need to make sure that the output writers are shut down again on failure
- boolean success = false;
- try {
- for (int i = 0; i < outEdgesInOrder.size(); i++) {
- StreamEdge outEdge = outEdgesInOrder.get(i);
-
- RecordWriterOutput<?> streamOutput = createStreamOutput(
- outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
- containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
-
- this.streamOutputs[i] = streamOutput;
- streamOutputMap.put(outEdge, streamOutput);
- }
-
- // we create the chain of operators and grab the collector that leads into the chain
- List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
- this.chainEntryPoint = createOutputCollector(containingTask, configuration,
- chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
-
- this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);
-
- // add the head operator to the end of the list
- this.allOperators[this.allOperators.length - 1] = headOperator;
-
- success = true;
- }
- finally {
- // make sure we clean up after ourselves in case of a failure after acquiring
- // the first resources
- if (!success) {
- for (RecordWriterOutput<?> output : this.streamOutputs) {
- if (output != null) {
- output.close();
- output.clearBuffers();
- }
- }
- }
- }
-
- }
-
-
- public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
- CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
- for (RecordWriterOutput<?> streamOutput : streamOutputs) {
- streamOutput.broadcastEvent(barrier);
- }
- }
-
- public RecordWriterOutput<?>[] getStreamOutputs() {
- return streamOutputs;
- }
-
- public StreamOperator<?>[] getAllOperators() {
- return allOperators;
- }
-
- public Output<StreamRecord<OUT>> getChainEntryPoint() {
- return chainEntryPoint;
- }
-
- /**
- *
- * This method should be called before finishing the record emission, to make sure any data
- * that is still buffered will be sent. It also ensures that all data sending related
- * exceptions are recognized.
- *
- * @throws IOException Thrown, if the buffered data cannot be pushed into the output streams.
- */
- public void flushOutputs() throws IOException {
- for (RecordWriterOutput<?> streamOutput : getStreamOutputs()) {
- streamOutput.flush();
- }
- }
-
- /**
- * This method releases all resources of the record writer output. It stops the output
- * flushing thread (if there is one) and releases all buffers currently held by the output
- * serializers.
- *
- * <p>This method should never fail.
- */
- public void releaseOutputs() {
- try {
- for (RecordWriterOutput<?> streamOutput : streamOutputs) {
- streamOutput.close();
- }
- }
- finally {
- // make sure that we release the buffers in any case
- for (RecordWriterOutput<?> output : streamOutputs) {
- output.clearBuffers();
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // initialization utilities
- // ------------------------------------------------------------------------
-
- private static <T> Output<StreamRecord<T>> createOutputCollector(
- StreamTask<?, ?> containingTask,
- StreamConfig operatorConfig,
- Map<Integer, StreamConfig> chainedConfigs,
- ClassLoader userCodeClassloader,
- Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
- List<StreamOperator<?>> allOperators)
- {
- // We create a wrapper that will encapsulate the chained operators and network outputs
- OutputSelectorWrapper<T> outputSelectorWrapper = operatorConfig.getOutputSelectorWrapper(userCodeClassloader);
- CollectorWrapper<T> wrapper = new CollectorWrapper<T>(outputSelectorWrapper);
-
- // create collectors for the network outputs
- for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
- @SuppressWarnings("unchecked")
- RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
- wrapper.addCollector(output, outputEdge);
- }
-
- // Create collectors for the chained outputs
- for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
- int outputId = outputEdge.getTargetId();
- StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
-
- Output<StreamRecord<T>> output = createChainedOperator(
- containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
- wrapper.addCollector(output, outputEdge);
- }
- return wrapper;
- }
-
- private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
- StreamTask<?, ?> containingTask,
- StreamConfig operatorConfig,
- Map<Integer, StreamConfig> chainedConfigs,
- ClassLoader userCodeClassloader,
- Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
- List<StreamOperator<?>> allOperators)
- {
- // create the output that the operator writes to first. this may recursively create more operators
- Output<StreamRecord<OUT>> output = createOutputCollector(
- containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
-
- // now create the operator and give it the output collector to write its output to
- OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
- chainedOperator.setup(containingTask, operatorConfig, output);
-
- allOperators.add(chainedOperator);
-
- if (containingTask.getExecutionConfig().isObjectReuseEnabled() || chainedOperator.isInputCopyingDisabled()) {
- return new ChainingOutput<IN>(chainedOperator);
- }
- else {
- TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
- return new CopyingChainingOutput<IN>(chainedOperator, inSerializer);
- }
- }
-
- private static <T> RecordWriterOutput<T> createStreamOutput(
- StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
- Environment taskEnvironment, boolean withTimestamps,
- AccumulatorRegistry.Reporter reporter, String taskName)
- {
- TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
-
- @SuppressWarnings("unchecked")
- StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
-
- LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
-
- ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
-
- StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
- new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
- output.setReporter(reporter);
-
- return new RecordWriterOutput<T>(output, outSerializer, withTimestamps);
- }
-
- // ------------------------------------------------------------------------
- // Collectors for output chaining
- // ------------------------------------------------------------------------
-
- private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
-
- protected final OneInputStreamOperator<T, ?> operator;
-
- public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
- this.operator = operator;
- }
-
- @Override
- public void collect(StreamRecord<T> record) {
- try {
- operator.setKeyContextElement(record);
- operator.processElement(record);
- }
- catch (Exception e) {
- throw new ExceptionInChainedOperatorException(e);
- }
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- try {
- operator.processWatermark(mark);
- }
- catch (Exception e) {
- throw new ExceptionInChainedOperatorException(e);
- }
- }
-
- @Override
- public void close() {
- try {
- operator.close();
- }
- catch (Exception e) {
- throw new ExceptionInChainedOperatorException(e);
- }
- }
- }
-
- private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
-
- private final TypeSerializer<T> serializer;
-
- private final StreamRecord<T> copyRecord;
-
- public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
- super(operator);
- this.serializer = serializer;
- this.copyRecord = new StreamRecord<T>(null, 0L);
- }
-
- @Override
- public void collect(StreamRecord<T> record) {
- try {
- T copy = serializer.copy(record.getValue());
- copyRecord.replace(copy, record.getTimestamp());
-
- operator.setKeyContextElement(copyRecord);
- operator.processElement(copyRecord);
- }
- catch (Exception e) {
- throw new RuntimeException("Could not forward element to next operator", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
deleted file mode 100644
index 3d82275..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Task for executing streaming sources.
- *
- * One important aspect of this is that the checkpointing and the emission of elements must never
- * occur at the same time. The execution must be serial. This is achieved by having the contract
- * with the StreamFunction that it must only modify its state or emit elements in
- * a synchronized block that locks on the lock Object. Also, the modification of the state
- * and the emission of elements must happen in the same block of code that is protected by the
- * synchronized block.
- *
- * @param <OUT> Type of the output elements of this source.
- */
-public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
-
- @Override
- protected void init() {
- // does not hold any resources, so no initialization needed
- }
-
- @Override
- protected void cleanup() {
- // does not hold any resources, so no cleanup needed
- }
-
-
- @Override
- protected void run() throws Exception {
- final Object checkpointLock = getCheckpointLock();
- final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<>(getHeadOutput(), checkpointLock);
- headOperator.run(checkpointLock, output);
- }
-
- @Override
- protected void cancelTask() throws Exception {
- headOperator.cancel();
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Special output for sources that ensures that sources synchronize on the lock object before
- * emitting elements.
- *
- * <p>
- * This is required to ensure that no concurrent method calls on operators later in the chain
- * can occur. When operators register a timer the timer callback is synchronized
- * on the same lock object.
- *
- * @param <T> The type of elements emitted by the source.
- */
- private class SourceOutput<T> implements Output<T> {
-
- private final Output<T> output;
- private final Object lockObject;
-
- public SourceOutput(Output<T> output, Object lockObject) {
- this.output = output;
- this.lockObject = lockObject;
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- synchronized (lockObject) {
- output.emitWatermark(mark);
- }
- }
-
- @Override
- public void collect(T record) {
- synchronized (lockObject) {
- checkTimerException();
- output.collect(record);
- }
- }
-
- @Override
- public void close() {
- output.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
deleted file mode 100644
index 2125df1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
-
- private volatile boolean running = true;
-
- // ------------------------------------------------------------------------
-
- @Override
- protected void run() throws Exception {
-
- final String iterationId = getConfiguration().getIterationId();
- if (iterationId == null || iterationId.length() == 0) {
- throw new Exception("Missing iteration ID in the task configuration");
- }
-
- final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
- getEnvironment().getIndexInSubtaskGroup());
-
- final long iterationWaitTime = getConfiguration().getIterationWaitTime();
- final boolean shouldWait = iterationWaitTime > 0;
-
- final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);
-
- // offer the queue for the tail
- BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
- LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID);
-
- // do the work
- try {
- @SuppressWarnings("unchecked")
- RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
-
- // If timestamps are enabled we make sure to remove cyclic watermark dependencies
- if (getExecutionConfig().areTimestampsEnabled()) {
- for (RecordWriterOutput<OUT> output : outputs) {
- output.emitWatermark(new Watermark(Long.MAX_VALUE));
- }
- }
-
- while (running) {
- StreamRecord<OUT> nextRecord = shouldWait ?
- dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
- dataChannel.take();
-
- if (nextRecord != null) {
- for (RecordWriterOutput<OUT> output : outputs) {
- output.collect(nextRecord);
- }
- }
- else {
- // done
- break;
- }
- }
- }
- finally {
- // make sure that we remove the queue from the broker, to prevent a resource leak
- BlockingQueueBroker.INSTANCE.remove(brokerID);
- LOG.info("Iteration head {} removed feedback queue under {}", getName(), brokerID);
- }
- }
-
- @Override
- protected void cancelTask() {
- running = false;
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void init() {
- // does not hold any resources, no initialization necessary
- }
-
- @Override
- protected void cleanup() throws Exception {
- // does not hold any resources, no cleanup necessary
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- /**
- * Creates the identification string with which head and tail task find the shared blocking
- * queue for the back channel. The identification string is unique per parallel head/tail pair
- * per iteration per job.
- *
- * @param jid The job ID.
- * @param iterationID The id of the iteration in the job.
- * @param subtaskIndex The parallel subtask number
- * @return The identification string.
- */
- public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) {
- return jid + "-" + iterationID + "-" + subtaskIndex;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
deleted file mode 100644
index 9bb5311..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
-
- @Override
- public void init() throws Exception {
- super.init();
-
- final String iterationId = getConfiguration().getIterationId();
- if (iterationId == null || iterationId.length() == 0) {
- throw new Exception("Missing iteration ID in the task configuration");
- }
-
- final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
- getEnvironment().getIndexInSubtaskGroup());
-
- final long iterationWaitTime = getConfiguration().getIterationWaitTime();
-
- LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
-
- @SuppressWarnings("unchecked")
- BlockingQueue<StreamRecord<IN>> dataChannel =
- (BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
-
- LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
-
- this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
- }
-
- private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
-
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("NonSerializableFieldInSerializableClass")
- private final BlockingQueue<StreamRecord<IN>> dataChannel;
-
- private final long iterationWaitTime;
-
- private final boolean shouldWait;
-
- RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
- this.dataChannel = dataChannel;
- this.iterationWaitTime = iterationWaitTime;
- this.shouldWait = iterationWaitTime > 0;
- }
-
- @Override
- public void processElement(StreamRecord<IN> record) throws Exception {
- if (shouldWait) {
- dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
- }
- else {
- dataChannel.put(record);
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) {
- // ignore
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
deleted file mode 100644
index 8c58e29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateBackendFactory;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for all streaming tasks. A task is the unit of local processing that is deployed
- * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
- * the Task's operator chain. Operators that are chained together execute synchronously in the
- * same thread and hence on the same stream partition. A common case for these chaines
- * are successive map/flatmap/filter tasks.
- *
- * <p>The task chain contains one "head" operator and multiple chained operators.
- * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
- * as well as for sources, iteration heads and iteration tails.
- *
- * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
- * produced by the operators at the ends of the operator chain. Note that the chain may fork and
- * thus have multiple ends.
- *
- * The life cycle of the task is set up as follows:
- * <pre>
- * -- registerInputOutput()
- * |
- * +----> Create basic utils (config, etc) and load the chain of operators
- * +----> operators.setup()
- * +----> task specific init()
- *
- * -- restoreState() -> restores state of all operators in the chain
- *
- * -- invoke()
- * |
- * +----> open-operators()
- * +----> run()
- * +----> close-operators()
- * +----> dispose-operators()
- * +----> common cleanup
- * +----> task specific cleanup()
- * </pre>
- *
- * <p> The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
- * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
- * are called concurrently.
- *
- * @param <OUT>
- * @param <Operator>
- */
-public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
- extends AbstractInvokable
- implements StatefulTask<StreamTaskStateList> {
-
- /** The thread group that holds all trigger timer threads */
- public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
-
- /** The logger used by the StreamTask and its subclasses */
- protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
-
- // ------------------------------------------------------------------------
-
- /**
- * All interaction with the {@code StreamOperator} must be synchronized on this lock object to ensure that
- * we don't have concurrent method calls that void consistent checkpoints.
- */
- private final Object lock = new Object();
-
- /** the head operator that consumes the input streams of this task */
- protected Operator headOperator;
-
- /** The chain of operators executed by this task */
- private OperatorChain<OUT> operatorChain;
-
- /** The configuration of this streaming task */
- private StreamConfig configuration;
-
- /** The class loader used to load dynamic classes of a job */
- private ClassLoader userClassLoader;
-
- /** The state backend that stores the state and checkpoints for this task */
- private StateBackend<?> stateBackend;
-
- /** The executor service that schedules and calls the triggers of this task*/
- private ScheduledExecutorService timerService;
-
- /** The map of user-defined accumulators of this task */
- private Map<String, Accumulator<?, ?>> accumulatorMap;
-
- /** The state to be restored once the initialization is done */
- private StreamTaskStateList lazyRestoreState;
-
- /** This field is used to forward an exception that is caught in the timer thread. Subclasses
- * must ensure that exceptions stored here get thrown on the actual execution Thread. */
- private volatile TimerException timerException;
-
- /** Flag to mark the task "in operation", in which case check
- * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
- private volatile boolean isRunning;
-
-
- // ------------------------------------------------------------------------
- // Life cycle methods for specific implementations
- // ------------------------------------------------------------------------
-
- protected abstract void init() throws Exception;
-
- protected abstract void run() throws Exception;
-
- protected abstract void cleanup() throws Exception;
-
- protected abstract void cancelTask() throws Exception;
-
- // ------------------------------------------------------------------------
- // Core work methods of the Stream Task
- // ------------------------------------------------------------------------
-
- @Override
- public final void registerInputOutput() throws Exception {
- LOG.debug("registerInputOutput for {}", getName());
-
- boolean initializationCompleted = false;
- try {
- AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
-
- userClassLoader = getUserCodeClassLoader();
- configuration = new StreamConfig(getTaskConfiguration());
- accumulatorMap = accumulatorRegistry.getUserMap();
-
- stateBackend = createStateBackend();
- stateBackend.initializeForJob(getEnvironment().getJobID());
-
- headOperator = configuration.getStreamOperator(userClassLoader);
- operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
-
- if (headOperator != null) {
- headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
- }
-
- timerService = Executors.newSingleThreadScheduledExecutor(
- new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
-
- // task specific initialization
- init();
-
- initializationCompleted = true;
- }
- finally {
- if (!initializationCompleted) {
- if (timerService != null) {
- timerService.shutdownNow();
- }
- if (operatorChain != null) {
- operatorChain.releaseOutputs();
- }
- }
- }
- }
-
- @Override
- public final void invoke() throws Exception {
- LOG.debug("Invoking {}", getName());
-
- boolean disposed = false;
- try {
- // first order of business is to ive operators back their state
- restoreStateLazy();
-
- // we need to make sure that any triggers scheduled in open() cannot be
- // executed before all operators are opened
- synchronized (lock) {
- openAllOperators();
- }
-
- // let the task do its work
- isRunning = true;
- run();
- isRunning = false;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finished task {}", getName());
- }
-
- // make sure no further checkpoint and notification actions happen.
- // we make sure that no other thread is currently in the locked scope before
- // we close the operators by trying to acquire the checkpoint scope lock
- // we also need to make sure that no triggers fire concurrently with the close logic
- synchronized (lock) {
- // this is part of the main logic, so if this fails, the task is considered failed
- closeAllOperators();
- }
-
- // make sure all buffered data is flushed
- operatorChain.flushOutputs();
-
- // make an attempt to dispose the operators such that failures in the dispose call
- // still let the computation fail
- tryDisposeAllOperators();
- disposed = true;
- }
- finally {
- isRunning = false;
-
- timerService.shutdownNow();
-
- // release the output resources. this method should never fail.
- if (operatorChain != null) {
- operatorChain.releaseOutputs();
- }
-
- // we must! perform this cleanup
-
- try {
- cleanup();
- }
- catch (Throwable t) {
- // catch and log the exception to not replace the original exception
- LOG.error("Error during cleanup of stream task.");
- }
-
- // if the operators were not disposed before, do a hard dispose
- if (!disposed) {
- disposeAllOperators();
- }
-
- try {
- if (stateBackend != null) {
- stateBackend.close();
- }
- } catch (Throwable t) {
- LOG.error("Error while closing the state backend", t);
- }
- }
- }
-
- @Override
- public final void cancel() throws Exception {
- isRunning = false;
- cancelTask();
- }
-
- private void openAllOperators() throws Exception {
- for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
- if (operator != null) {
- operator.open();
- }
- }
- }
-
- private void closeAllOperators() throws Exception {
- // We need to close them first to last, since upstream operators in the chain might emit
- // elements in their close methods.
- StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
- for (int i = allOperators.length - 1; i >= 0; i--) {
- StreamOperator<?> operator = allOperators[i];
- if (operator != null) {
- operator.close();
- }
- }
- }
-
- private void tryDisposeAllOperators() throws Exception {
- for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
- if (operator != null) {
- operator.dispose();
- }
- }
- }
-
- private void disposeAllOperators() {
- for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
- try {
- if (operator != null) {
- operator.dispose();
- }
- }
- catch (Throwable t) {
- LOG.error("Error during disposal of stream operator.", t);
- }
- }
- }
-
- /**
- * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
- * shutdown method was never called.
- *
- * <p>
- * This should not be relied upon! It will cause shutdown to happen much later than if manual
- * shutdown is attempted, and cause threads to linger for longer than needed.
- */
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- if (timerService != null) {
- if (!timerService.isTerminated()) {
- LOG.warn("Timer service was not shut down. Shutting down in finalize().");
- }
- timerService.shutdown();
- }
- }
-
- // ------------------------------------------------------------------------
- // Access to properties and utilities
- // ------------------------------------------------------------------------
-
- /**
- * Gets the name of the task, in the form "taskname (2/5)".
- * @return The name of the task.
- */
- public String getName() {
- return getEnvironment().getTaskNameWithSubtasks();
- }
-
- /**
- * Gets the lock object on which all operations that involve data and state mutation have to lock.
-
- * @return The checkpoint lock object.
- */
- public Object getCheckpointLock() {
- return lock;
- }
-
- public StreamConfig getConfiguration() {
- return configuration;
- }
-
- public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
- return accumulatorMap;
- }
-
- public Output<StreamRecord<OUT>> getHeadOutput() {
- return operatorChain.getChainEntryPoint();
- }
-
- public RecordWriterOutput<?>[] getStreamOutputs() {
- return operatorChain.getStreamOutputs();
- }
-
- // ------------------------------------------------------------------------
- // Checkpoint and Restore
- // ------------------------------------------------------------------------
-
- @Override
- public void setInitialState(StreamTaskStateList initialState) {
- lazyRestoreState = initialState;
- }
-
- public void restoreStateLazy() throws Exception {
- if (lazyRestoreState != null) {
- LOG.info("Restoring checkpointed state to task {}", getName());
-
- try {
- final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
- final StreamTaskState[] states = lazyRestoreState.getState(userClassLoader);
-
- // be GC friendly
- lazyRestoreState = null;
-
- for (int i = 0; i < states.length; i++) {
- StreamTaskState state = states[i];
- StreamOperator<?> operator = allOperators[i];
-
- if (state != null && operator != null) {
- LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
- operator.restoreState(state);
- }
- else if (operator != null) {
- LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
- }
- }
- }
- catch (Exception e) {
- throw new Exception("Could not restore checkpointed state to operators and functions", e);
- }
- }
- }
-
- @Override
- public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
- LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
-
- synchronized (lock) {
- if (isRunning) {
-
- // since both state checkpointing and downstream barrier emission occurs in this
- // lock scope, they are an atomic operation regardless of the order in which they occur
- // we immediately emit the checkpoint barriers, so the downstream operators can start
- // their checkpoint work as soon as possible
- operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-
- // now draw the state snapshot
- try {
- final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
- final StreamTaskState[] states = new StreamTaskState[allOperators.length];
-
- for (int i = 0; i < states.length; i++) {
- StreamOperator<?> operator = allOperators[i];
- if (operator != null) {
- StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
- states[i] = state.isEmpty() ? null : state;
- }
- }
-
- StreamTaskStateList allStates = new StreamTaskStateList(states);
- if (allStates.isEmpty()) {
- getEnvironment().acknowledgeCheckpoint(checkpointId);
- } else {
- getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
- }
- }
- catch (Exception e) {
- if (isRunning) {
- throw e;
- }
- }
- }
- }
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- synchronized (lock) {
- if (isRunning) {
- LOG.debug("Notification of complete checkpoint for task {}", getName());
-
- for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
- if (operator != null) {
- operator.notifyOfCompletedCheckpoint(checkpointId);
- }
- }
- }
- else {
- LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // State backend
- // ------------------------------------------------------------------------
-
- /**
- * Gets the state backend used by this task. The state backend defines how to maintain the
- * key/value state and how and where to store state snapshots.
- *
- * @return The state backend used by this task.
- */
- public StateBackend<?> getStateBackend() {
- return stateBackend;
- }
-
- private StateBackend<?> createStateBackend() throws Exception {
- StateBackend<?> configuredBackend = configuration.getStateBackend(userClassLoader);
-
- if (configuredBackend != null) {
- // backend has been configured on the environment
- LOG.info("Using user-defined state backend: " + configuredBackend);
- return configuredBackend;
- } else {
- // see if we have a backend specified in the configuration
- Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
- String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
-
- if (backendName == null) {
- LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
- backendName = "jobmanager";
- }
-
- backendName = backendName.toLowerCase();
- switch (backendName) {
- case "jobmanager":
- LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
- return MemoryStateBackend.defaultInstance();
-
- case "filesystem":
- FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
- LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
- + backend.getBasePath() + "\")");
- return backend;
-
- default:
- try {
- @SuppressWarnings("rawtypes")
- Class<? extends StateBackendFactory> clazz =
- Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
-
- return (StateBackend<?>) clazz.newInstance();
- } catch (ClassNotFoundException e) {
- throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
- } catch (ClassCastException e) {
- throw new IllegalConfigurationException("The class configured under '" +
- ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
- backendName + ')');
- } catch (Throwable t) {
- throw new IllegalConfigurationException("Cannot create configured state backend", t);
- }
- }
- }
- }
-
- /**
- * Registers a timer.
- */
- public void registerTimer(final long timestamp, final Triggerable target) {
- long delay = Math.max(timestamp - System.currentTimeMillis(), 0);
-
- timerService.schedule(
- new TriggerTask(this, lock, target, timestamp),
- delay,
- TimeUnit.MILLISECONDS);
- }
-
- public void checkTimerException() throws TimerException {
- if (timerException != null) {
- throw timerException;
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return getName();
- }
-
- protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
- return new EventListener<CheckpointBarrier>() {
- @Override
- public void onEvent(CheckpointBarrier barrier) {
- try {
- triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
- }
- catch (Exception e) {
- throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
- }
- }
- };
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Internal task that is invoked by the timer service and triggers the target.
- */
- private static final class TriggerTask implements Runnable {
-
- private final Object lock;
- private final Triggerable target;
- private final long timestamp;
- private final StreamTask<?, ?> task;
-
- TriggerTask(StreamTask<?, ?> task, final Object lock, Triggerable target, long timestamp) {
- this.task = task;
- this.lock = lock;
- this.target = target;
- this.timestamp = timestamp;
- }
-
- @Override
- public void run() {
- synchronized (lock) {
- try {
- target.trigger(timestamp);
- } catch (Throwable t) {
- LOG.error("Caught exception while processing timer.", t);
- if (task.timerException == null) {
- task.timerException = new TimerException(t);
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
deleted file mode 100644
index 5680810..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-/**
- * An exception that is thrown by the stream vertices when encountering an
- * illegal condition.
- */
-public class StreamTaskException extends RuntimeException {
-
- /**
- * Serial version UID for serialization interoperability.
- */
- private static final long serialVersionUID = 8392043527067472439L;
-
- /**
- * Creates a compiler exception with no message and no cause.
- */
- public StreamTaskException() {
- }
-
- /**
- * Creates a compiler exception with the given message and no cause.
- *
- * @param message
- * The message for the exception.
- */
- public StreamTaskException(String message) {
- super(message);
- }
-
- /**
- * Creates a compiler exception with the given cause and no message.
- *
- * @param cause
- * The <tt>Throwable</tt> that caused this exception.
- */
- public StreamTaskException(Throwable cause) {
- super(cause);
- }
-
- /**
- * Creates a compiler exception with the given message and cause.
- *
- * @param message
- * The message for the exception.
- * @param cause
- * The <tt>Throwable</tt> that caused this exception.
- */
- public StreamTaskException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
deleted file mode 100644
index afeabd9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-
-import java.io.Serializable;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-/**
- * The state checkpointed by a {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}.
- * This state consists of any combination of those three:
- * <ul>
- * <li>The state of the stream operator, if it implements the Checkpointed interface.</li>
- * <li>The state of the user function, if it implements the Checkpointed interface.</li>
- * <li>The key/value state of the operator, if it executes on a KeyedDataStream.</li>
- * </ul>
- */
-public class StreamTaskState implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private StateHandle<?> operatorState;
-
- private StateHandle<Serializable> functionState;
-
- private HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates;
-
- // ------------------------------------------------------------------------
-
- public StateHandle<?> getOperatorState() {
- return operatorState;
- }
-
- public void setOperatorState(StateHandle<?> operatorState) {
- this.operatorState = operatorState;
- }
-
- public StateHandle<Serializable> getFunctionState() {
- return functionState;
- }
-
- public void setFunctionState(StateHandle<Serializable> functionState) {
- this.functionState = functionState;
- }
-
- public HashMap<String, KvStateSnapshot<?, ?, ?>> getKvStates() {
- return kvStates;
- }
-
- public void setKvStates(HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates) {
- this.kvStates = kvStates;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Checks if this state object actually contains any state, or if all of the state
- * fields are null.
- *
- * @return True, if all state is null, false if at least one state is not null.
- */
- public boolean isEmpty() {
- return operatorState == null & functionState == null & kvStates == null;
- }
-
- /**
- * Discards all the contained states and sets them to null.
- *
- * @throws Exception Forwards exceptions that occur when releasing the
- * state handles and snapshots.
- */
- public void discardState() throws Exception {
- StateHandle<?> operatorState = this.operatorState;
- StateHandle<?> functionState = this.functionState;
- HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates = this.kvStates;
-
- if (operatorState != null) {
- operatorState.discardState();
- }
- if (functionState != null) {
- functionState.discardState();
- }
- if (kvStates != null) {
- while (kvStates.size() > 0) {
- try {
- Iterator<KvStateSnapshot<?, ?, ?>> values = kvStates.values().iterator();
- while (values.hasNext()) {
- KvStateSnapshot<?, ?, ?> s = values.next();
- s.discardState();
- values.remove();
- }
- }
- catch (ConcurrentModificationException e) {
- // fall through the loop
- }
- }
- }
-
- this.operatorState = null;
- this.functionState = null;
- this.kvStates = null;
- }
-}
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
deleted file mode 100644
index 7b8dbd5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-/**
- * List of task states for a chain of streaming tasks.
- */
-public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
-
- private static final long serialVersionUID = 1L;
-
- /** The states for all operator */
- private final StreamTaskState[] states;
-
-
- public StreamTaskStateList(StreamTaskState[] states) {
- this.states = states;
- }
-
- public boolean isEmpty() {
- for (StreamTaskState state : states) {
- if (state != null) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public StreamTaskState[] getState(ClassLoader userCodeClassLoader) {
- return states;
- }
-
- @Override
- public void discardState() throws Exception {
- for (StreamTaskState state : states) {
- if (state != null) {
- state.discardState();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
deleted file mode 100644
index 3e1c1e5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-/**
- * {@code RuntimeException} for wrapping exceptions that are thrown in the timer callback of
- * the timer service in {@link StreamTask}.
- */
-public class TimerException extends RuntimeException {
- private static final long serialVersionUID = 1L;
-
- public TimerException(Throwable cause) {
- super(cause);
- }
-
- @Override
- public String toString() {
- return "TimerException{" + getCause() + "}";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
deleted file mode 100644
index d2d8a2e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-
-public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
-
- private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
-
- private volatile boolean running = true;
-
- @Override
- public void init() throws Exception {
- StreamConfig configuration = getConfiguration();
- ClassLoader userClassLoader = getUserCodeClassLoader();
-
- TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
- TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
- ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
- List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
- for (int i = 0; i < numberOfInputs; i++) {
- int inputType = inEdges.get(i).getTypeNumber();
- InputGate reader = getEnvironment().getInputGate(i);
- switch (inputType) {
- case 1:
- inputList1.add(reader);
- break;
- case 2:
- inputList2.add(reader);
- break;
- default:
- throw new RuntimeException("Invalid input type number: " + inputType);
- }
- }
-
- this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
- inputDeserializer1, inputDeserializer2,
- getCheckpointBarrierListener(),
- configuration.getCheckpointMode(),
- getEnvironment().getIOManager(),
- getExecutionConfig().areTimestampsEnabled());
-
- // make sure that stream tasks report their I/O statistics
- AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
- AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
- this.inputProcessor.setReporter(reporter);
- }
-
- @Override
- protected void run() throws Exception {
- // cache some references on the stack, to make the code more JIT friendly
- final TwoInputStreamOperator<IN1, IN2, OUT> operator = this.headOperator;
- final StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor;
- final Object lock = getCheckpointLock();
-
- while (running && inputProcessor.processInput(operator, lock)) {
- checkTimerException();
- }
- }
-
- @Override
- protected void cleanup() throws Exception {
- inputProcessor.cleanup();
- }
-
- @Override
- protected void cancelTask() {
- running = false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
deleted file mode 100644
index a40ae3a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains classes that realize streaming tasks. These tasks are
- * executable stream consumers and producers that are scheduled by the distributed
- * dataflow runtime. Each task occupies one execution slot and is run with by an
- * executing thread.
- * <p>
- * The tasks merely set up the distributed stream coordination and the checkpointing.
- * Internally, the tasks create one or more operators, perform the stream transformations.
- */
-package org.apache.flink.streaming.runtime.tasks;
\ No newline at end of file