You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:51 UTC
[10/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
deleted file mode 100644
index b79b7e5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/UID.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamrecord;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Object for creating unique IDs for {@link StreamRecord}s.
- *
- **/
-public class UID implements IOReadableWritable, Serializable {
- private static final long serialVersionUID = 1L;
-
- private ByteBuffer uid;
- private static Random random = new Random();
-
- public UID() {
- uid = ByteBuffer.allocate(20);
- }
-
- // TODO: consider sequential ids
- public UID(int channelID) {
- byte[] uuid = new byte[16];
- random.nextBytes(uuid);
- uid = ByteBuffer.allocate(20).putInt(channelID).put(uuid);
- }
-
- UID(byte[] id) {
- uid = ByteBuffer.wrap(id);
- }
-
- public int getChannelId() {
- uid.position(0);
- return uid.getInt();
- }
-
- public byte[] getGeneratedId() {
- uid.position(4);
- return uid.slice().array();
- }
-
- public byte[] getId() {
- uid.position(0);
- return uid.array();
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.write(uid.array());
- }
-
- private void writeObject(ObjectOutputStream stream) throws IOException {
- stream.write(uid.array());
- }
-
- private void readObject(java.io.ObjectInputStream stream) throws IOException,
- ClassNotFoundException {
- byte[] uidA = new byte[20];
- stream.read(uidA);
- uid = ByteBuffer.allocate(20).put(uidA);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- byte[] uidByteArray = new byte[20];
- in.readFully(uidByteArray, 0, 20);
- uid = ByteBuffer.wrap(uidByteArray);
- }
-
- @Override
- public String toString() {
- return getChannelId() + "-" + Long.toHexString(uid.getLong(4)) + "-"
- + Long.toHexString(uid.getLong(12));
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(getId());
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- } else {
- try {
- UID other = (UID) obj;
- return Arrays.equals(this.getId(), other.getId());
- } catch (ClassCastException e) {
- return false;
- }
- }
- }
-
- public UID copy() {
- return new UID(Arrays.copyOf(uid.array(), 20));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
deleted file mode 100644
index f277be0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.StreamEdge;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.CoRecordReader;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.streaming.io.InputGateFactory;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
-
- protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
- protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
-
- MutableObjectIterator<StreamRecord<IN1>> inputIter1;
- MutableObjectIterator<StreamRecord<IN2>> inputIter2;
-
- CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
- CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
-
- private static int numTasks;
-
- public CoStreamVertex() {
- numTasks = newVertex();
- instanceID = numTasks;
- }
-
- private void setDeserializers() {
- inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
- inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
- }
-
- @Override
- public void setInputsOutputs() {
- outputHandler = new OutputHandler<OUT>(this);
-
- setConfigInputs();
-
- coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
- inputDeserializer1, inputDeserializer2);
- }
-
- @Override
- public void clearBuffers() throws IOException {
- outputHandler.clearWriters();
- coReader.clearBuffers();
- coReader.cleanup();
- }
-
- protected void setConfigInputs() throws StreamVertexException {
- setDeserializers();
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
- ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
- List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
- for (int i = 0; i < numberOfInputs; i++) {
- int inputType = inEdges.get(i).getTypeNumber();
- InputGate reader = getEnvironment().getInputGate(i);
- switch (inputType) {
- case 1:
- inputList1.add(reader);
- break;
- case 2:
- inputList2.add(reader);
- break;
- default:
- throw new RuntimeException("Invalid input type number: " + inputType);
- }
- }
-
- final InputGate reader1 = InputGateFactory.createInputGate(inputList1);
- final InputGate reader2 = InputGateFactory.createInputGate(inputList2);
-
- coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
- reader1, reader2);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> MutableObjectIterator<X> getInput(int index) {
- switch (index) {
- case 0:
- return (MutableObjectIterator<X>) inputIter1;
- case 1:
- return (MutableObjectIterator<X>) inputIter2;
- default:
- throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
- }
- }
-
- @Override
- public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
- throw new UnsupportedOperationException("Currently unsupported for connected streams");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
- switch (index) {
- case 0:
- return (StreamRecordSerializer<X>) inputDeserializer1;
- case 1:
- return (StreamRecordSerializer<X>) inputDeserializer2;
- default:
- throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X, Y> CoReaderIterator<X, Y> getCoReader() {
- return (CoReaderIterator<X, Y>) coIter;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
deleted file mode 100644
index c6a4377..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.IndexedMutableReader;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.streaming.io.InputGateFactory;
-
-public class InputHandler<IN> {
- private StreamRecordSerializer<IN> inputSerializer = null;
- private IndexedReaderIterator<StreamRecord<IN>> inputIter;
- private IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>> inputs;
-
- private StreamVertex<IN, ?> streamVertex;
- private StreamConfig configuration;
-
- public InputHandler(StreamVertex<IN, ?> streamComponent) {
- this.streamVertex = streamComponent;
- this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
- try {
- setConfigInputs();
- } catch (Exception e) {
- throw new StreamVertexException("Cannot register inputs for "
- + getClass().getSimpleName(), e);
- }
-
- }
-
- protected void setConfigInputs() throws StreamVertexException {
- inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- if (numberOfInputs > 0) {
- InputGate inputGate = InputGateFactory.createInputGate(streamVertex.getEnvironment().getAllInputGates());
- inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
-
- inputs.registerTaskEventListener(streamVertex.getSuperstepListener(),
- StreamingSuperstep.class);
-
- inputIter = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inputSerializer);
- }
- }
-
- protected static <T> IndexedReaderIterator<StreamRecord<T>> staticCreateInputIterator(
- MutableReader<?> inputReader, TypeSerializer<StreamRecord<T>> serializer) {
-
- // generic data type serialization
- @SuppressWarnings("unchecked")
- IndexedMutableReader<DeserializationDelegate<StreamRecord<T>>> reader = (IndexedMutableReader<DeserializationDelegate<StreamRecord<T>>>) inputReader;
- final IndexedReaderIterator<StreamRecord<T>> iter = new IndexedReaderIterator<StreamRecord<T>>(
- reader, serializer);
- return iter;
- }
-
- public StreamRecordSerializer<IN> getInputSerializer() {
- return inputSerializer;
- }
-
- public IndexedReaderIterator<StreamRecord<IN>> getInputIter() {
- return inputIter;
- }
-
- public void clearReaders() throws IOException {
- if (inputs != null) {
- inputs.clearBuffers();
- inputs.cleanup();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
deleted file mode 100644
index 40a83f3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.StreamEdge;
-import org.apache.flink.streaming.api.collector.CollectorWrapper;
-import org.apache.flink.streaming.api.collector.StreamOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.RecordWriterFactory;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutputHandler<OUT> {
- private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
-
- private StreamVertex<?, OUT> vertex;
- private StreamConfig configuration;
- private ClassLoader cl;
- private Collector<OUT> outerCollector;
-
- public List<ChainableInvokable<?, ?>> chainedInvokables;
-
- private Map<StreamEdge, StreamOutput<?>> outputMap;
-
- private Map<Integer, StreamConfig> chainedConfigs;
- private List<StreamEdge> outEdgesInOrder;
-
- public OutputHandler(StreamVertex<?, OUT> vertex) {
-
- // Initialize some fields
- this.vertex = vertex;
- this.configuration = new StreamConfig(vertex.getTaskConfiguration());
- this.chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>();
- this.outputMap = new HashMap<StreamEdge, StreamOutput<?>>();
- this.cl = vertex.getUserCodeClassLoader();
-
- // We read the chained configs, and the order of record writer
- // registrations by outputname
- this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
- this.chainedConfigs.put(configuration.getVertexID(), configuration);
-
- this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
-
- // We iterate through all the out edges from this job vertex and create
- // a stream output
- for (StreamEdge outEdge : outEdgesInOrder) {
- StreamOutput<?> streamOutput = createStreamOutput(
- outEdge,
- outEdge.getTargetID(),
- chainedConfigs.get(outEdge.getSourceID()),
- outEdgesInOrder.indexOf(outEdge));
- outputMap.put(outEdge, streamOutput);
- }
-
- // We create the outer collector that will be passed to the first task
- // in the chain
- this.outerCollector = createChainedCollector(configuration);
- }
-
- public void broadcastBarrier(long id) throws IOException, InterruptedException {
- StreamingSuperstep barrier = new StreamingSuperstep(id);
- for (StreamOutput<?> streamOutput : outputMap.values()) {
- streamOutput.broadcastEvent(barrier);
- }
- }
-
- public Collection<StreamOutput<?>> getOutputs() {
- return outputMap.values();
- }
-
- /**
- * This method builds up a nested collector which encapsulates all the
- * chained operators and their network output. The result of this recursive
- * call will be passed as collector to the first invokable in the chain.
- *
- * @param chainedTaskConfig
- * The configuration of the starting operator of the chain, we
- * use this paramater to recursively build the whole chain
- * @return Returns the collector for the chain starting from the given
- * config
- */
- @SuppressWarnings({"unchecked", "rawtypes"})
- private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
-
-
- // We create a wrapper that will encapsulate the chained operators and
- // network outputs
-
- OutputSelectorWrapper<OUT> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(cl);
- CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(outputSelectorWrapper);
-
- // Create collectors for the network outputs
- for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
- Collector<?> outCollector = outputMap.get(outputEdge);
-
- wrapper.addCollector(outCollector, outputEdge);
- }
-
- // Create collectors for the chained outputs
- for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
- Integer output = outputEdge.getTargetID();
-
- Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
-
- wrapper.addCollector(outCollector, outputEdge);
- }
-
- if (chainedTaskConfig.isChainStart()) {
- // The current task is the first chained task at this vertex so we
- // return the wrapper
- return wrapper;
- } else {
- // The current task is a part of the chain so we get the chainable
- // invokable which will be returned and set it up using the wrapper
- ChainableInvokable chainableInvokable = chainedTaskConfig.getUserInvokable(vertex
- .getUserCodeClassLoader());
- chainableInvokable.setup(wrapper,
- chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
-
- chainedInvokables.add(chainableInvokable);
- return chainableInvokable;
- }
-
- }
-
- public Collector<OUT> getCollector() {
- return outerCollector;
- }
-
- /**
- * We create the StreamOutput for the specific output given by the id, and
- * the configuration of its source task
- *
- * @param outputVertex
- * Name of the output to which the streamoutput will be set up
- * @param upStreamConfig
- * The config of upStream task
- * @return The created StreamOutput
- */
- private <T> StreamOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
- StreamConfig upStreamConfig, int outputIndex) {
-
- StreamRecordSerializer<T> outSerializer = upStreamConfig
- .getTypeSerializerOut1(vertex.userClassLoader);
- SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null;
-
- if (outSerializer != null) {
- outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer);
- outSerializationDelegate.setInstance(outSerializer.createInstance());
- }
-
- @SuppressWarnings("unchecked")
- StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
-
- ResultPartitionWriter bufferWriter = vertex.getEnvironment().getWriter(outputIndex);
-
- RecordWriter<SerializationDelegate<StreamRecord<T>>> output =
- RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
-
- StreamOutput<T> streamOutput = new StreamOutput<T>(output, vertex.instanceID,
- outSerializationDelegate);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
- .getSimpleName(), outputIndex, vertex.getClass().getSimpleName());
- }
-
- return streamOutput;
- }
-
- public void flushOutputs() throws IOException, InterruptedException {
- for (StreamOutput<?> streamOutput : getOutputs()) {
- streamOutput.close();
- }
- }
-
- public void clearWriters() {
- for (StreamOutput<?> output : outputMap.values()) {
- output.clearBuffers();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
deleted file mode 100644
index 816c0d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.Collection;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.collector.StreamOutput;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
-
- private Collection<StreamOutput<?>> outputs;
-
- private static int numSources;
- private Integer iterationId;
- @SuppressWarnings("rawtypes")
- private BlockingQueue<StreamRecord> dataChannel;
- private long iterationWaitTime;
- private boolean shouldWait;
-
- @SuppressWarnings("rawtypes")
- public StreamIterationHead() {
- numSources = newVertex();
- instanceID = numSources;
- dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
- }
-
- @Override
- public void setInputsOutputs() {
- outputHandler = new OutputHandler<OUT>(this);
- outputs = outputHandler.getOutputs();
-
- iterationId = configuration.getIterationId();
- iterationWaitTime = configuration.getIterationWaitTime();
- shouldWait = iterationWaitTime > 0;
-
- try {
- BlockingQueueBroker.instance().handIn(iterationId.toString()+"-"
- +getEnvironment().getIndexInSubtaskGroup(), dataChannel);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Iteration source {} invoked with instance id {}", getName(), getInstanceID());
- }
-
- try {
- StreamRecord<OUT> nextRecord;
-
- while (true) {
- if (shouldWait) {
- nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
- } else {
- nextRecord = dataChannel.take();
- }
- if (nextRecord == null) {
- break;
- }
- for (StreamOutput<?> output : outputs) {
- ((StreamOutput<OUT>) output).collect(nextRecord.getObject());
- }
- }
-
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Iteration source failed due to: {}", StringUtils.stringifyException(e));
- }
- throw e;
- } finally {
- // Cleanup
- outputHandler.flushOutputs();
- clearBuffers();
- }
-
- }
-
- @Override
- protected void setInvokable() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
deleted file mode 100644
index ab09aff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
-
- private InputHandler<IN> inputHandler;
-
- private Integer iterationId;
- @SuppressWarnings("rawtypes")
- private BlockingQueue<StreamRecord> dataChannel;
- private long iterationWaitTime;
- private boolean shouldWait;
-
- public StreamIterationTail() {
- }
-
- @Override
- public void setInputsOutputs() {
- try {
- inputHandler = new InputHandler<IN>(this);
-
- iterationId = configuration.getIterationId();
- iterationWaitTime = configuration.getIterationWaitTime();
- shouldWait = iterationWaitTime > 0;
- dataChannel = BlockingQueueBroker.instance().get(iterationId.toString()+"-"
- +getEnvironment().getIndexInSubtaskGroup());
- } catch (Exception e) {
- throw new StreamVertexException(String.format(
- "Cannot register inputs of StreamIterationSink %s", iterationId), e);
- }
- }
-
- @Override
- public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Iteration sink {} invoked", getName());
- }
-
- try {
- forwardRecords();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Iteration sink {} invoke finished", getName());
- }
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Iteration sink failed due to: {}", StringUtils.stringifyException(e));
- }
- throw e;
- } finally {
- // Cleanup
- clearBuffers();
- }
- }
-
- protected void forwardRecords() throws Exception {
- StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
- while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
- if (!pushToQueue(reuse)) {
- break;
- }
- reuse = inputHandler.getInputSerializer().createInstance();
- }
- }
-
- private boolean pushToQueue(StreamRecord<IN> record) throws InterruptedException {
- try {
- if (shouldWait) {
- return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
- } else {
- dataChannel.put(record);
- return true;
- }
- } catch (InterruptedException e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
- StringUtils.stringifyException(e));
- throw e;
- }
- return false;
- }
- }
-
- @Override
- protected void setInvokable() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
deleted file mode 100644
index 1c904ca..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public interface StreamTaskContext<OUT> {
-
- StreamConfig getConfig();
-
- ClassLoader getUserCodeClassLoader();
-
- <X> MutableObjectIterator<X> getInput(int index);
-
- <X> IndexedReaderIterator<X> getIndexedInput(int index);
-
- <X> StreamRecordSerializer<X> getInputSerializer(int index);
-
- Collector<OUT> getOutputCollector();
-
- <X, Y> CoReaderIterator<X, Y> getCoReader();
-
- ExecutionConfig getExecutionConfig();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
deleted file mode 100644
index b56eda3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
-import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
-import org.apache.flink.runtime.messages.CheckpointingMessages;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import akka.actor.ActorRef;
-
-public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
- BarrierTransceiver, OperatorStateCarrier {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
-
- private static int numTasks;
-
- protected StreamConfig configuration;
- protected int instanceID;
- private static int numVertices = 0;
-
- private InputHandler<IN> inputHandler;
- protected OutputHandler<OUT> outputHandler;
- private StreamInvokable<IN, OUT> userInvokable;
- protected volatile boolean isRunning = false;
-
- private StreamingRuntimeContext context;
- private Map<String, OperatorState<?>> states;
-
- protected ClassLoader userClassLoader;
-
- private EventListener<TaskEvent> superstepListener;
-
- public StreamVertex() {
- userInvokable = null;
- numTasks = newVertex();
- instanceID = numTasks;
- superstepListener = new SuperstepEventListener();
- }
-
- protected static int newVertex() {
- numVertices++;
- return numVertices;
- }
-
- @Override
- public void registerInputOutput() {
- initialize();
- setInputsOutputs();
- setInvokable();
- }
-
- protected void initialize() {
- this.userClassLoader = getUserCodeClassLoader();
- this.configuration = new StreamConfig(getTaskConfiguration());
- this.states = new HashMap<String, OperatorState<?>>();
- this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
- }
-
- @Override
- public void broadcastBarrierFromSource(long id) {
- // Only called at input vertices
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received barrier from jobmanager: " + id);
- }
- actOnBarrier(id);
- }
-
- /**
- * This method is called to confirm that a barrier has been fully processed.
- * It sends an acknowledgment to the jobmanager. In the current version if
- * there is user state it also checkpoints the state to the jobmanager.
- */
- @Override
- public void confirmBarrier(long barrierID) throws IOException {
-
- if (configuration.getStateMonitoring() && !states.isEmpty()) {
- getEnvironment().getJobManager().tell(
- new CheckpointingMessages.StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
- .getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
- new LocalStateHandle(states)), ActorRef.noSender());
- } else {
- getEnvironment().getJobManager().tell(
- new CheckpointingMessages.BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
- context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
- }
-
- }
-
- public void setInputsOutputs() {
- inputHandler = new InputHandler<IN>(this);
- outputHandler = new OutputHandler<OUT>(this);
- }
-
- protected void setInvokable() {
- userInvokable = configuration.getUserInvokable(userClassLoader);
- userInvokable.setup(this);
- }
-
- public String getName() {
- return getEnvironment().getTaskName();
- }
-
- public int getInstanceID() {
- return instanceID;
- }
-
- public StreamingRuntimeContext createRuntimeContext(String taskName,
- Map<String, OperatorState<?>> states) {
- Environment env = getEnvironment();
- return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
- getExecutionConfig(), states);
- }
-
- @Override
- public void invoke() throws Exception {
- this.isRunning = true;
-
- boolean operatorOpen = false;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
- }
-
- try {
- userInvokable.setRuntimeContext(context);
-
- operatorOpen = true;
- openOperator();
-
- userInvokable.invoke();
-
- closeOperator();
- operatorOpen = false;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
- }
-
- } catch (Exception e) {
-
- if (operatorOpen) {
- try {
- closeOperator();
- } catch (Throwable t) {
- }
- }
-
- if (LOG.isErrorEnabled()) {
- LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e));
- }
- throw e;
- } finally {
- this.isRunning = false;
- // Cleanup
- outputHandler.flushOutputs();
- clearBuffers();
- }
-
- }
-
- protected void openOperator() throws Exception {
- userInvokable.open(getTaskConfiguration());
-
- for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
- invokable.setRuntimeContext(context);
- invokable.open(getTaskConfiguration());
- }
- }
-
- protected void closeOperator() throws Exception {
- userInvokable.close();
-
- for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
- invokable.close();
- }
- }
-
- protected void clearBuffers() throws IOException {
- if (outputHandler != null) {
- outputHandler.clearWriters();
- }
- if (inputHandler != null) {
- inputHandler.clearReaders();
- }
- }
-
- @Override
- public void cancel() {
- if (userInvokable != null) {
- userInvokable.cancel();
- }
- }
-
- @Override
- public StreamConfig getConfig() {
- return configuration;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> MutableObjectIterator<X> getInput(int index) {
- if (index == 0) {
- return (MutableObjectIterator<X>) inputHandler.getInputIter();
- } else {
- throw new IllegalArgumentException("There is only 1 input");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
- if (index == 0) {
- return (IndexedReaderIterator<X>) inputHandler.getInputIter();
- } else {
- throw new IllegalArgumentException("There is only 1 input");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
- if (index == 0) {
- return (StreamRecordSerializer<X>) inputHandler.getInputSerializer();
- } else {
- throw new IllegalArgumentException("There is only 1 input");
- }
- }
-
- @Override
- public Collector<OUT> getOutputCollector() {
- return outputHandler.getCollector();
- }
-
- @Override
- public <X, Y> CoReaderIterator<X, Y> getCoReader() {
- throw new IllegalArgumentException("CoReader not available");
- }
-
- public EventListener<TaskEvent> getSuperstepListener() {
- return this.superstepListener;
- }
-
- /**
- * Method to be called when a barrier is received from all the input
- * channels. It should broadcast the barrier to the output operators,
- * checkpoint the state and send an ack.
- *
- * @param id
- */
- private synchronized void actOnBarrier(long id) {
- if (isRunning) {
- try {
- outputHandler.broadcastBarrier(id);
- confirmBarrier(id);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Superstep " + id + " processed: " + StreamVertex.this);
- }
- } catch (Exception e) {
- // Only throw any exception if the vertex is still running
- if (isRunning) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- @Override
- public String toString() {
- return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
- }
-
- /**
- * Re-injects the user states into the map
- */
- @Override
- public void injectState(StateHandle stateHandle) {
- this.states.putAll(stateHandle.getState(userClassLoader));
- }
-
- private class SuperstepEventListener implements EventListener<TaskEvent> {
-
- @Override
- public void onEvent(TaskEvent event) {
- actOnBarrier(((StreamingSuperstep) event).getId());
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
deleted file mode 100644
index ed8b91e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-/**
- * An exception that is thrown by the stream verices when encountering an
- * illegal condition.
- */
-public class StreamVertexException extends RuntimeException {
-
- /**
- * Serial version UID for serialization interoperability.
- */
- private static final long serialVersionUID = 8392043527067472439L;
-
- /**
- * Creates a compiler exception with no message and no cause.
- */
- public StreamVertexException() {
- }
-
- /**
- * Creates a compiler exception with the given message and no cause.
- *
- * @param message
- * The message for the exception.
- */
- public StreamVertexException(String message) {
- super(message);
- }
-
- /**
- * Creates a compiler exception with the given cause and no message.
- *
- * @param cause
- * The <tt>Throwable</tt> that caused this exception.
- */
- public StreamVertexException(Throwable cause) {
- super(cause);
- }
-
- /**
- * Creates a compiler exception with the given message and cause.
- *
- * @param message
- * The message for the exception.
- * @param cause
- * The <tt>Throwable</tt> that caused this exception.
- */
- public StreamVertexException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
deleted file mode 100644
index ff876b1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingRuntimeContext.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.OperatorState;
-
-/**
- * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
- * operators.
- */
-public class StreamingRuntimeContext extends RuntimeUDFContext {
-
- private final Environment env;
- private final Map<String, OperatorState<?>> operatorStates;
-
- public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
- ExecutionConfig executionConfig, Map<String, OperatorState<?>> operatorStates) {
- super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
- executionConfig, env.getCopyTask());
- this.env = env;
- this.operatorStates = operatorStates;
- }
-
- /**
- * Returns the operator state registered by the given name for the operator.
- *
- * @param name
- * Name of the operator state to be returned.
- * @return The operator state.
- */
- public OperatorState<?> getState(String name) {
- if (operatorStates == null) {
- throw new RuntimeException("No state has been registered for this operator.");
- } else {
- OperatorState<?> state = operatorStates.get(name);
- if (state != null) {
- return state;
- } else {
- throw new RuntimeException("No state has been registered for the name: " + name);
- }
- }
- }
-
- /**
- * Returns whether there is a state stored by the given name
- */
- public boolean containsState(String name) {
- return operatorStates.containsKey(name);
- }
-
- /**
- * This is a beta feature </br></br> Register an operator state for this
- * operator by the given name. This name can be used to retrieve the state
- * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
- * obtain the {@link StreamingRuntimeContext} from the user-defined function
- * use the {@link RichFunction#getRuntimeContext()} method.
- *
- * @param name
- * The name of the operator state.
- * @param state
- * The state to be registered for this name.
- */
- public void registerState(String name, OperatorState<?> state) {
- if (state == null) {
- throw new RuntimeException("Cannot register null state");
- } else {
- if (operatorStates.containsKey(name)) {
- throw new RuntimeException("State is already registered");
- } else {
- operatorStates.put(name, state);
- }
- }
- }
-
- /**
- * Returns the input split provider associated with the operator.
- *
- * @return The input split provider.
- */
- public InputSplitProvider getInputSplitProvider() {
- return env.getInputSplitProvider();
- }
-
- /**
- * Returns the stub parameters associated with the {@link TaskConfig} of the
- * operator.
- *
- * @return The stub parameters.
- */
- public Configuration getTaskStubParameters() {
- return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
deleted file mode 100644
index d46ca79..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamvertex;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
-
-public class StreamingSuperstep extends TaskEvent {
-
- protected long id;
-
- public StreamingSuperstep() {
-
- }
-
- public StreamingSuperstep(long id) {
- this.id = id;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeLong(id);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- id = in.readLong();
- }
-
- public long getId() {
- return id;
- }
-
- public boolean equals(Object other) {
- if (other == null || !(other instanceof StreamingSuperstep)) {
- return false;
- } else {
- return ((StreamingSuperstep) other).id == this.id;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
index abe5298..cab2bef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.windowing.helper.Timestamp;
* This interface extends the {@link TriggerPolicy} interface with functionality
* for active triggers. Active triggers can act in two ways:
*
- * 1) Whenever an element arrives at the invokable, the
+ * 1) Whenever an element arrives at the operator, the
* {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
* first. It can return zero ore more fake data points which will be added
* before the currently arrived real element gets processed. This allows to
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.helper.Timestamp;
* datapoint is always considered as triggered.
*
* 2) An active trigger has a factory method for a runnable. This factory method
- * gets called at the start up of the invokable. The returned runnable will be
+ * gets called at the start up of the operator. The returned runnable will be
* executed in its own thread and can submit fake elements at any time threw an
* {@link ActiveTriggerCallback}. This allows to have time based triggers based
* on any system internal time measure. Triggers are not called on fake
@@ -44,7 +44,7 @@ import org.apache.flink.streaming.api.windowing.helper.Timestamp;
public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
/**
- * Whenever an element arrives at the invokable, the
+ * Whenever an element arrives at the operator, the
* {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
* first. It can return zero ore more fake data points which will be added
* before the the currently arrived real element gets processed. This allows
@@ -53,7 +53,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
* datapoint is always considered as triggered.
*
* @param datapoint
- * the data point which arrived at the invokable
+ * the data point which arrived at the operator
* @return zero ore more fake data points which will be added before the the
* currently arrived real element gets processed.
*/
@@ -61,7 +61,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
/**
* This is the factory method for a runnable. This factory method gets
- * called at the start up of the invokable. The returned runnable will be
+ * called at the start up of the operator. The returned runnable will be
* executed in its own thread and can submit fake elements at any time threw
* an {@link ActiveTriggerCallback}. This allows to have time based triggers
* based on any system internal time measure. Triggers are not called on
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
index 1937b3f..6bc5072 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy;
/**
- * When used in {@link GroupedWindowInvokable}, eviction policies must
+ * When used in grouped windowing, eviction policies must
* provide a clone method. Eviction policies get cloned to provide an own
* instance for each group and respectively each individual element buffer as
* groups maintain their own buffers with the elements belonging to the
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
index 6a04461..5b5e20b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy;
/**
- * When used in {@link GroupedWindowInvokable}, trigger policies can provide
+ * When used in grouped windowing, trigger policies can provide
* a clone method. Cloneable triggers can can be used in a distributed manner,
* which means they get cloned to provide an own instance for each group. This
* allows each group to trigger individually and only based on the elements
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
index c224ad4..b95053a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
@@ -40,7 +40,7 @@ public interface EvictionPolicy<DATA> extends Serializable {
* @param triggered
* Information whether the UDF was triggered or not
* @param bufferSize
- * The current size of the element buffer at the invokable
+ * The current size of the element buffer at the operator
* @return The number of elements to be deleted from the buffer
*/
public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize);
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
deleted file mode 100644
index 6c198a7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class encapsulating the functionality that is necessary to sync inputs on
- * superstep barriers. Once a barrier is received from an input channel, whe
- * should not process further buffers from that channel until we received the
- * barrier from all other channels as well. To avoid back-pressuring the
- * readers, we buffer up the new data received from the blocked channels until
- * the blocks are released.
- *
- */
-public class BarrierBuffer {
-
- private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
- private Queue<SpillingBufferOrEvent> nonprocessed = new LinkedList<SpillingBufferOrEvent>();
- private Queue<SpillingBufferOrEvent> blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
-
- private Set<Integer> blockedChannels = new HashSet<Integer>();
- private int totalNumberOfInputChannels;
-
- private StreamingSuperstep currentSuperstep;
- private boolean superstepStarted;
-
- private AbstractReader reader;
-
- private InputGate inputGate;
-
- private SpillReader spillReader;
- private BufferSpiller bufferSpiller;
-
- private boolean inputFinished = false;
-
- private BufferOrEvent endOfStreamEvent = null;
-
- public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
- this.inputGate = inputGate;
- totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- this.reader = reader;
- try {
- this.bufferSpiller = new BufferSpiller();
- this.spillReader = new SpillReader();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- /**
- * Starts the next superstep in the buffer
- *
- * @param superstep
- * The next superstep
- */
- protected void startSuperstep(StreamingSuperstep superstep) {
- this.currentSuperstep = superstep;
- this.superstepStarted = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Superstep started with id: " + superstep.getId());
- }
- }
-
- /**
- * Get then next non-blocked non-processed BufferOrEvent. Returns null if
- * not available.
- *
- * @throws IOException
- */
- protected BufferOrEvent getNonProcessed() throws IOException {
- SpillingBufferOrEvent nextNonprocessed;
-
- while ((nextNonprocessed = nonprocessed.poll()) != null) {
- BufferOrEvent boe = nextNonprocessed.getBufferOrEvent();
- if (isBlocked(boe.getChannelIndex())) {
- blockedNonprocessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
- } else {
- return boe;
- }
- }
-
- return null;
- }
-
- /**
- * Checks whether a given channel index is blocked for this inputgate
- *
- * @param channelIndex
- * The channel index to check
- */
- protected boolean isBlocked(int channelIndex) {
- return blockedChannels.contains(channelIndex);
- }
-
- /**
- * Checks whether all channels are blocked meaning that barriers are
- * received from all channels
- */
- protected boolean isAllBlocked() {
- return blockedChannels.size() == totalNumberOfInputChannels;
- }
-
- /**
- * Returns the next non-blocked BufferOrEvent. This is a blocking operator.
- */
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
- // If there are non-processed buffers from the previously blocked ones,
- // we get the next
- BufferOrEvent bufferOrEvent = getNonProcessed();
-
- if (bufferOrEvent != null) {
- return bufferOrEvent;
- } else if (blockedNonprocessed.isEmpty() && inputFinished) {
- return endOfStreamEvent;
- } else {
- // If no non-processed, get new from input
- while (true) {
- if (!inputFinished) {
- // We read the next buffer from the inputgate
- bufferOrEvent = inputGate.getNextBufferOrEvent();
-
- if (!bufferOrEvent.isBuffer()
- && bufferOrEvent.getEvent() instanceof EndOfPartitionEvent) {
- if (inputGate.isFinished()) {
- // store the event for later if the channel is
- // closed
- endOfStreamEvent = bufferOrEvent;
- inputFinished = true;
- }
-
- } else {
- if (isBlocked(bufferOrEvent.getChannelIndex())) {
- // If channel blocked we just store it
- blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent,
- bufferSpiller, spillReader));
- } else {
- return bufferOrEvent;
- }
- }
- } else {
- actOnAllBlocked();
- return getNextNonBlocked();
- }
- }
- }
- }
-
- /**
- * Blocks the given channel index, from which a barrier has been received.
- *
- * @param channelIndex
- * The channel index to block.
- */
- protected void blockChannel(int channelIndex) {
- if (!blockedChannels.contains(channelIndex)) {
- blockedChannels.add(channelIndex);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Channel blocked with index: " + channelIndex);
- }
- if (isAllBlocked()) {
- actOnAllBlocked();
- }
-
- } else {
- throw new RuntimeException("Tried to block an already blocked channel");
- }
- }
-
- /**
- * Releases the blocks on all channels.
- *
- * @throws IOException
- */
- protected void releaseBlocks() {
- if (!nonprocessed.isEmpty()) {
- // sanity check
- throw new RuntimeException("Error in barrier buffer logic");
- }
- nonprocessed = blockedNonprocessed;
- blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
-
- try {
- spillReader.setSpillFile(bufferSpiller.getSpillFile());
- bufferSpiller.resetSpillFile();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- blockedChannels.clear();
- superstepStarted = false;
- if (LOG.isDebugEnabled()) {
- LOG.debug("All barriers received, blocks released");
- }
- }
-
- /**
- * Method that is executed once the barrier has been received from all
- * channels.
- */
- protected void actOnAllBlocked() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Publishing barrier to the vertex");
- }
-
- if (currentSuperstep != null) {
- reader.publish(currentSuperstep);
- }
-
- releaseBlocks();
- }
-
- /**
- * Processes a streaming superstep event
- *
- * @param bufferOrEvent
- * The BufferOrEvent containing the event
- */
- public void processSuperstep(BufferOrEvent bufferOrEvent) {
- StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
- if (!superstepStarted) {
- startSuperstep(superstep);
- }
- blockChannel(bufferOrEvent.getChannelIndex());
- }
-
- public void cleanup() throws IOException {
- bufferSpiller.close();
- File spillfile1 = bufferSpiller.getSpillFile();
- if (spillfile1 != null) {
- spillfile1.delete();
- }
-
- spillReader.close();
- File spillfile2 = spillReader.getSpillFile();
- if (spillfile2 != null) {
- spillfile2.delete();
- }
- }
-
- public String toString() {
- return nonprocessed.toString() + blockedNonprocessed.toString();
- }
-
- public boolean isEmpty() {
- return nonprocessed.isEmpty() && blockedNonprocessed.isEmpty();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
deleted file mode 100644
index 3ee2508..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BlockingQueueBroker.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
-@SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
- /**
- * Singleton instance
- */
- private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
-
- private BlockingQueueBroker() {
- }
-
- /**
- * retrieve singleton instance
- */
- public static Broker<BlockingQueue<StreamRecord>> instance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
deleted file mode 100644
index b028ea7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.util.Random;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.StringUtils;
-
-public class BufferSpiller {
-
- protected static Random rnd = new Random();
-
- private File spillFile;
- protected FileChannel spillingChannel;
- private String tempDir;
-
- public BufferSpiller() throws IOException {
- String tempDirString = GlobalConfiguration.getString(
- ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
- String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
-
- tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
-
- createSpillingChannel();
- }
-
- /**
- * Dumps the contents of the buffer to disk and recycles the buffer.
- */
- public void spill(Buffer buffer) throws IOException {
- try {
- spillingChannel.write(buffer.getNioBuffer());
- buffer.recycle();
- } catch (IOException e) {
- close();
- throw new IOException(e);
- }
-
- }
-
- @SuppressWarnings("resource")
- private void createSpillingChannel() throws IOException {
- this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
- this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
- }
-
- private static String randomString(Random random) {
- final byte[] bytes = new byte[20];
- random.nextBytes(bytes);
- return StringUtils.byteToHexString(bytes);
- }
-
- public void close() throws IOException {
- if (spillingChannel != null && spillingChannel.isOpen()) {
- spillingChannel.close();
- }
- }
-
- public void resetSpillFile() throws IOException {
- close();
- createSpillingChannel();
- }
-
- public File getSpillFile() {
- return spillFile;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
deleted file mode 100644
index ed90c03..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
-
-/**
- * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
- * input types.
- */
-public class CoReaderIterator<T1, T2> {
-
- private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
- // source
-
- protected final ReusingDeserializationDelegate<T1> delegate1;
- protected final ReusingDeserializationDelegate<T2> delegate2;
-
- public CoReaderIterator(
- CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
- TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
- this.reader = reader;
- this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1);
- this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2);
- }
-
- public int next(T1 target1, T2 target2) throws IOException {
- this.delegate1.setInstance(target1);
- this.delegate2.setInstance(target2);
-
- try {
- return this.reader.getNextRecord(this.delegate1, this.delegate2);
-
- } catch (InterruptedException e) {
- throw new IOException("Reader interrupted.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
deleted file mode 100644
index 25cb25d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
-
-/**
- * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
- * types to read records effectively.
- */
-@SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
- AbstractReader implements EventListener<InputGate>, StreamingReader {
-
- private final InputGate bufferReader1;
-
- private final InputGate bufferReader2;
-
- private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>();
-
- private LinkedList<Integer> processed = new LinkedList<Integer>();
-
- private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
-
- private RecordDeserializer<T1> reader1currentRecordDeserializer;
-
- private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
-
- private RecordDeserializer<T2> reader2currentRecordDeserializer;
-
- // 0 => none, 1 => reader (T1), 2 => reader (T2)
- private int currentReaderIndex;
-
- private boolean hasRequestedPartitions;
-
- protected CoBarrierBuffer barrierBuffer1;
- protected CoBarrierBuffer barrierBuffer2;
-
- public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
- super(new UnionInputGate(inputgate1, inputgate2));
-
- this.bufferReader1 = inputgate1;
- this.bufferReader2 = inputgate2;
-
- this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1
- .getNumberOfInputChannels()];
- this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2
- .getNumberOfInputChannels()];
-
- for (int i = 0; i < reader1RecordDeserializers.length; i++) {
- reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>();
- }
-
- for (int i = 0; i < reader2RecordDeserializers.length; i++) {
- reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
- }
-
- inputgate1.registerListener(this);
- inputgate2.registerListener(this);
-
- barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
- barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
-
- barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
- barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
- }
-
- public void requestPartitionsOnce() throws IOException, InterruptedException {
- if (!hasRequestedPartitions) {
- bufferReader1.requestPartitions();
- bufferReader2.requestPartitions();
-
- hasRequestedPartitions = true;
- }
- }
-
- @SuppressWarnings("unchecked")
- protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
-
- requestPartitionsOnce();
-
- while (true) {
- if (currentReaderIndex == 0) {
- if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
- return 0;
- }
-
- currentReaderIndex = getNextReaderIndexBlocking();
-
- }
-
- if (currentReaderIndex == 1) {
- while (true) {
- if (reader1currentRecordDeserializer != null) {
- RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer
- .getNextRecord(target1);
-
- if (result.isBufferConsumed()) {
- reader1currentRecordDeserializer.getCurrentBuffer().recycle();
- reader1currentRecordDeserializer = null;
-
- currentReaderIndex = 0;
- }
-
- if (result.isFullRecord()) {
- return 1;
- }
- } else {
-
- final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked();
-
- if (boe.isBuffer()) {
- reader1currentRecordDeserializer = reader1RecordDeserializers[boe
- .getChannelIndex()];
- reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
- } else if (boe.getEvent() instanceof StreamingSuperstep) {
- barrierBuffer1.processSuperstep(boe);
- currentReaderIndex = 0;
-
- break;
- } else if (handleEvent(boe.getEvent())) {
- currentReaderIndex = 0;
-
- break;
- }
- }
- }
- } else if (currentReaderIndex == 2) {
- while (true) {
- if (reader2currentRecordDeserializer != null) {
- RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer
- .getNextRecord(target2);
-
- if (result.isBufferConsumed()) {
- reader2currentRecordDeserializer.getCurrentBuffer().recycle();
- reader2currentRecordDeserializer = null;
-
- currentReaderIndex = 0;
- }
-
- if (result.isFullRecord()) {
- return 2;
- }
- } else {
- final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked();
-
- if (boe.isBuffer()) {
- reader2currentRecordDeserializer = reader2RecordDeserializers[boe
- .getChannelIndex()];
- reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
- } else if (boe.getEvent() instanceof StreamingSuperstep) {
- barrierBuffer2.processSuperstep(boe);
- currentReaderIndex = 0;
-
- break;
- } else if (handleEvent(boe.getEvent())) {
- currentReaderIndex = 0;
-
- break;
- }
- }
- }
- } else {
- throw new IllegalStateException("Bug: unexpected current reader index.");
- }
- }
- }
-
- protected int getNextReaderIndexBlocking() throws InterruptedException {
-
- Integer nextIndex = 0;
-
- while (processed.contains(nextIndex = availableRecordReaders.take())) {
- processed.remove(nextIndex);
- }
-
- if (nextIndex == 1) {
- if (barrierBuffer1.isAllBlocked()) {
- availableRecordReaders.addFirst(1);
- processed.add(2);
- return 2;
- } else {
- return 1;
- }
- } else {
- if (barrierBuffer2.isAllBlocked()) {
- availableRecordReaders.addFirst(2);
- processed.add(1);
- return 1;
- } else {
- return 2;
- }
-
- }
-
- }
-
- // ------------------------------------------------------------------------
- // Data availability notifications
- // ------------------------------------------------------------------------
-
- @Override
- public void onEvent(InputGate bufferReader) {
- addToAvailable(bufferReader);
- }
-
- protected void addToAvailable(InputGate bufferReader) {
- if (bufferReader == bufferReader1) {
- availableRecordReaders.add(1);
- } else if (bufferReader == bufferReader2) {
- availableRecordReaders.add(2);
- }
- }
-
- public void clearBuffers() {
- for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
- for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) {
- Buffer buffer = deserializer.getCurrentBuffer();
- if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
- }
- }
- }
-
- private class CoBarrierBuffer extends BarrierBuffer {
-
- private CoBarrierBuffer otherBuffer;
-
- public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) {
- super(inputGate, reader);
- }
-
- public void setOtherBarrierBuffer(CoBarrierBuffer other) {
- this.otherBuffer = other;
- }
-
- @Override
- protected void actOnAllBlocked() {
- if (otherBuffer.isAllBlocked()) {
- super.actOnAllBlocked();
- otherBuffer.releaseBlocks();
- }
- }
-
- }
-
- public void cleanup() throws IOException {
- try {
- barrierBuffer1.cleanup();
- } finally {
- barrierBuffer2.cleanup();
- }
-
- }
-}