You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:39 UTC
[06/12] [streaming] Streaming jobgraph and vertex refactor to match
recent runtime changes
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
deleted file mode 100644
index 0477afa..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/InputHandler.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.MutableReader;
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class InputHandler<IN> {
- private StreamRecordSerializer<IN> inputSerializer = null;
- private MutableObjectIterator<StreamRecord<IN>> inputIter;
- private MutableReader<IOReadableWritable> inputs;
-
- private AbstractStreamComponent streamComponent;
- private StreamConfig configuration;
-
- public InputHandler(AbstractStreamComponent streamComponent) {
- this.streamComponent = streamComponent;
- this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
- try {
- setConfigInputs();
- } catch (Exception e) {
- throw new StreamComponentException("Cannot register inputs for "
- + getClass().getSimpleName(), e);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- protected void setConfigInputs() throws StreamComponentException {
- setDeserializer();
-
- int numberOfInputs = configuration.getNumberOfInputs();
-
- if (numberOfInputs < 2) {
-
- inputs = new MutableRecordReader<IOReadableWritable>(streamComponent);
-
- } else {
- MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
-
- for (int i = 0; i < numberOfInputs; i++) {
- recordReaders[i] = new MutableRecordReader<IOReadableWritable>(streamComponent);
- }
- inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
- }
-
- inputIter = createInputIterator();
- }
-
- private void setDeserializer() {
- TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
- inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
- }
-
- private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs, inputSerializer);
- return iter;
- }
-
- protected static <T> MutableObjectIterator<StreamRecord<T>> staticCreateInputIterator(
- MutableReader<?> inputReader, TypeSerializer<?> serializer) {
-
- // generic data type serialization
- @SuppressWarnings("unchecked")
- MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
- @SuppressWarnings({ "unchecked", "rawtypes" })
- final MutableObjectIterator<StreamRecord<T>> iter = new ReaderIterator(reader, serializer);
- return iter;
- }
-
- public StreamRecordSerializer<IN> getInputSerializer() {
- return inputSerializer;
- }
-
- public MutableObjectIterator<StreamRecord<IN>> getInputIter() {
- return inputIter;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
deleted file mode 100644
index dabb871..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/OutputHandler.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.StreamRecordWriter;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutputHandler<OUT> {
- private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
-
- private AbstractStreamComponent streamComponent;
- private StreamConfig configuration;
-
- private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
- private StreamCollector<OUT> collector;
- private long bufferTimeout;
-
- TypeInformation<OUT> outTypeInfo = null;
- StreamRecordSerializer<OUT> outSerializer = null;
- SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
-
- public OutputHandler(AbstractStreamComponent streamComponent) {
- this.streamComponent = streamComponent;
- this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
- this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
-
- try {
- setConfigOutputs();
- } catch (StreamComponentException e) {
- throw new StreamComponentException("Cannot register outputs for "
- + streamComponent.getClass().getSimpleName(), e);
- }
- }
-
- public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() {
- return outputs;
- }
-
- private void setConfigOutputs() {
- setSerializers();
- setCollector();
-
- int numberOfOutputs = configuration.getNumberOfOutputs();
- bufferTimeout = configuration.getBufferTimeout();
-
- for (int i = 0; i < numberOfOutputs; i++) {
- setPartitioner(i, outputs);
- }
- }
-
- private StreamCollector<OUT> setCollector() {
- if (streamComponent.configuration.getDirectedEmit()) {
- OutputSelector<OUT> outputSelector = streamComponent.configuration.getOutputSelector();
-
- collector = new DirectedStreamCollector<OUT>(streamComponent.getInstanceID(),
- outSerializationDelegate, outputSelector);
- } else {
- collector = new StreamCollector<OUT>(streamComponent.getInstanceID(),
- outSerializationDelegate);
- }
- return collector;
- }
-
- public StreamCollector<OUT> getCollector() {
- return collector;
- }
-
- void setSerializers() {
- outTypeInfo = configuration.getTypeInfoOut1();
- outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
- outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
- outSerializationDelegate.setInstance(outSerializer.createInstance());
- }
-
- void setPartitioner(int outputNumber,
- List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
- StreamPartitioner<OUT> outputPartitioner = null;
-
- try {
- outputPartitioner = configuration.getPartitioner(outputNumber);
-
- } catch (Exception e) {
- throw new StreamComponentException("Cannot deserialize partitioner for "
- + streamComponent.getName() + " with " + outputNumber + " outputs", e);
- }
-
- RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
-
- if (bufferTimeout > 0) {
- output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
- streamComponent, outputPartitioner, bufferTimeout);
- } else {
- output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamComponent,
- outputPartitioner);
- }
-
- outputs.add(output);
- List<String> outputName = configuration.getOutputName(outputNumber);
- boolean isSelectAllOutput = configuration.getSelectAll(outputNumber);
-
- if (collector != null) {
- collector.addOutput(output, outputName, isSelectAllOutput);
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
- .getSimpleName(), outputNumber);
- }
- }
-
- public void flushOutputs() throws IOException, InterruptedException {
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.flush();
- }
- }
-
- public void initializeOutputSerializers() {
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
- output.initializeSerializers();
- }
- }
-
- long startTime;
-
- public void invokeUserFunction(String componentTypeName, StreamInvokable<OUT> userInvokable)
- throws IOException, InterruptedException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} {} invoked with instance id {}", componentTypeName,
- streamComponent.getName(), streamComponent.getInstanceID());
- }
-
- initializeOutputSerializers();
-
- try {
- streamComponent.invokeUserFunction(userInvokable);
- } catch (Exception e) {
- flushOutputs();
- throw new RuntimeException(e);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} {} invoke finished instance id {}", componentTypeName,
- streamComponent.getName(), streamComponent.getInstanceID());
- }
-
- flushOutputs();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentException.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentException.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentException.java
deleted file mode 100644
index 240c9ba..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-/**
- * An exception that is thrown by the stream components when encountering an
- * illegal condition.
- */
-public class StreamComponentException extends RuntimeException {
-
- /**
- * Serial version UID for serialization interoperability.
- */
- private static final long serialVersionUID = 8392043527067472439L;
-
- /**
- * Creates a compiler exception with no message and no cause.
- */
- public StreamComponentException() {
- }
-
- /**
- * Creates a compiler exception with the given message and no cause.
- *
- * @param message
- * The message for the exception.
- */
- public StreamComponentException(String message) {
- super(message);
- }
-
- /**
- * Creates a compiler exception with the given cause and no message.
- *
- * @param cause
- * The <tt>Throwable</tt> that caused this exception.
- */
- public StreamComponentException(Throwable cause) {
- super(cause);
- }
-
- /**
- * Creates a compiler exception with the given message and cause.
- *
- * @param message
- * The message for the exception.
- * @param cause
- * The <tt>Throwable</tt> that caused this exception.
- */
- public StreamComponentException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
deleted file mode 100755
index 32fc5f9..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-import org.apache.flink.util.StringUtils;
-
-public class StreamIterationSink<IN extends Tuple> extends AbstractStreamComponent {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSink.class);
-
- private InputHandler<IN> inputHandler;
-
- private String iterationId;
- @SuppressWarnings("rawtypes")
- private BlockingQueue<StreamRecord> dataChannel;
- private long iterationWaitTime;
- private boolean shouldWait;
-
- public StreamIterationSink() {
- }
-
- @Override
- public void setInputsOutputs() {
- try {
- inputHandler = new InputHandler<IN>(this);
-
- iterationId = configuration.getIterationId();
- iterationWaitTime = configuration.getIterationWaitTime();
- shouldWait = iterationWaitTime > 0;
- dataChannel = BlockingQueueBroker.instance().get(iterationId);
- } catch (Exception e) {
- throw new StreamComponentException(String.format(
- "Cannot register inputs of StreamIterationSink %s", iterationId), e);
- }
- }
-
- @Override
- public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("SINK {} invoked", getName());
- }
-
- forwardRecords();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("SINK {} invoke finished", getName());
- }
- }
-
- protected void forwardRecords() throws Exception {
- StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
- while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
- if (!pushToQueue(reuse)) {
- break;
- }
- // TODO: Fix object reuse for iteration
- reuse = inputHandler.getInputSerializer().createInstance();
- }
- }
-
- private boolean pushToQueue(StreamRecord<IN> record) {
- try {
- if (shouldWait) {
- return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
- } else {
- dataChannel.put(record);
- return true;
- }
- } catch (InterruptedException e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
- StringUtils.stringifyException(e));
- }
- return false;
- }
- }
-
- @Override
- protected void setInvokable() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
deleted file mode 100755
index ce5687a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.BlockingQueueBroker;
-
-public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSource.class);
-
- private OutputHandler<OUT> outputHandler;
-
- private static int numSources;
- private String iterationId;
- @SuppressWarnings("rawtypes")
- private BlockingQueue<StreamRecord> dataChannel;
- private long iterationWaitTime;
- private boolean shouldWait;
-
- @SuppressWarnings("rawtypes")
- public StreamIterationSource() {
- numSources = newComponent();
- instanceID = numSources;
- dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
- }
-
- @Override
- public void setInputsOutputs() {
- outputHandler = new OutputHandler<OUT>(this);
-
- iterationId = configuration.getIterationId();
- iterationWaitTime = configuration.getIterationWaitTime();
- shouldWait = iterationWaitTime > 0;
-
- try {
- BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
- } catch (Exception e) {
-
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
- }
-
- outputHandler.initializeOutputSerializers();
-
- StreamRecord<OUT> nextRecord;
-
- while (true) {
- if (shouldWait) {
- nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
- } else {
- nextRecord = dataChannel.take();
- }
- if (nextRecord == null) {
- break;
- }
- for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler
- .getOutputs()) {
- outputHandler.outSerializationDelegate.setInstance(nextRecord);
- output.emit(outputHandler.outSerializationDelegate);
- }
- }
-
- outputHandler.flushOutputs();
- }
-
- @Override
- protected void setInvokable() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
deleted file mode 100644
index e2982bf..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamSink<IN> extends AbstractStreamComponent {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamSink.class);
-
- private InputHandler<IN> inputHandler;
-
- private StreamOperatorInvokable<IN, IN> userInvokable;
-
- public StreamSink() {
- userInvokable = null;
- }
-
- @Override
- public void setInputsOutputs() {
- inputHandler = new InputHandler<IN>(this);
- }
-
- @Override
- protected void setInvokable() {
- userInvokable = configuration.getUserInvokable();
- userInvokable.initialize(null, inputHandler.getInputIter(), inputHandler.getInputSerializer(),
- isMutable);
- }
-
- @Override
- public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("SINK {} invoked", getName());
- }
-
- invokeUserFunction(userInvokable);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("SINK {} invoke finished", getName());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
deleted file mode 100644
index 11f372a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSource.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.invokable.SourceInvokable;
-
-public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent {
-
- protected OutputHandler<OUT> outputHandler;
-
- private SourceInvokable<OUT> sourceInvokable;
-
- private static int numSources;
-
- public StreamSource() {
- sourceInvokable = null;
- numSources = newComponent();
- instanceID = numSources;
- }
-
- @Override
- public void setInputsOutputs() {
- outputHandler = new OutputHandler<OUT>(this);
- }
-
- @Override
- protected void setInvokable() {
- sourceInvokable = configuration.getUserInvokable();
- sourceInvokable.setCollector(outputHandler.getCollector());
- }
-
- @Override
- public void invoke() throws Exception {
- outputHandler.invokeUserFunction("SOURCE", sourceInvokable);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
deleted file mode 100644
index 6824d09..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
-
-public class StreamTask<IN extends Tuple, OUT extends Tuple> extends AbstractStreamComponent {
-
- private InputHandler<IN> inputHandler;
- private OutputHandler<OUT> outputHandler;
-
- private StreamOperatorInvokable<IN, OUT> userInvokable;
-
- private static int numTasks;
-
- public StreamTask() {
- userInvokable = null;
- numTasks = newComponent();
- instanceID = numTasks;
- }
-
- @Override
- public void setInputsOutputs() {
- inputHandler = new InputHandler<IN>(this);
- outputHandler = new OutputHandler<OUT>(this);
- }
-
- @Override
- protected void setInvokable() {
- userInvokable = configuration.getUserInvokable();
- userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
- inputHandler.getInputSerializer(), isMutable);
- }
-
- @Override
- public void invoke() throws Exception {
- outputHandler.invokeUserFunction("TASK", userInvokable);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
new file mode 100644
index 0000000..5a6519d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.ArrayList;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.streaming.io.CoRecordReader;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class CoStreamVertex<IN1, IN2, OUT> extends
+ StreamVertex<IN1,OUT> {
+
+ private OutputHandler<OUT> outputHandler;
+
+ protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
+ protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
+
+ MutableObjectIterator<StreamRecord<IN1>> inputIter1;
+ MutableObjectIterator<StreamRecord<IN2>> inputIter2;
+
+ CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
+ CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
+
+ private CoInvokable<IN1, IN2, OUT> userInvokable;
+ private static int numTasks;
+
+ public CoStreamVertex() {
+ userInvokable = null;
+ numTasks = newVertex();
+ instanceID = numTasks;
+ }
+
+ private void setDeserializers() {
+ TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
+ inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
+
+ TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
+ inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
+ }
+
+ @Override
+ public void setInputsOutputs() {
+ outputHandler = new OutputHandler<OUT>(this);
+
+ setConfigInputs();
+
+ coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
+ inputDeserializer1, inputDeserializer2);
+ }
+
+ @Override
+ protected void setInvokable() {
+ userInvokable = configuration.getUserInvokable();
+ userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
+ inputDeserializer2, isMutable);
+ }
+
+ protected void setConfigInputs() throws StreamVertexException {
+ setDeserializers();
+
+ int numberOfInputs = configuration.getNumberOfInputs();
+
+ ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>> inputList1 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>>();
+ ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>> inputList2 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>>();
+
+ for (int i = 0; i < numberOfInputs; i++) {
+ int inputType = configuration.getInputType(i);
+ switch (inputType) {
+ case 1:
+ inputList1.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(
+ this));
+ break;
+ case 2:
+ inputList2.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(
+ this));
+ break;
+ default:
+ throw new RuntimeException("Invalid input type number: " + inputType);
+ }
+ }
+
+ coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
+ inputList1, inputList2);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ outputHandler.invokeUserFunction("CO-TASK", userInvokable);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
new file mode 100644
index 0000000..17d2ae5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.api.MutableReader;
+import org.apache.flink.runtime.io.network.api.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
+import org.apache.flink.runtime.operators.util.ReaderIterator;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class InputHandler<IN> {
+ private StreamRecordSerializer<IN> inputSerializer = null;
+ private MutableObjectIterator<StreamRecord<IN>> inputIter;
+ private MutableReader<IOReadableWritable> inputs;
+
+ private StreamVertex<IN,?> streamVertex;
+ private StreamConfig configuration;
+
+ public InputHandler(StreamVertex<IN,?> streamComponent) {
+ this.streamVertex = streamComponent;
+ this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
+ try {
+ setConfigInputs();
+ } catch (Exception e) {
+ throw new StreamVertexException("Cannot register inputs for "
+ + getClass().getSimpleName(), e);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void setConfigInputs() throws StreamVertexException {
+ setDeserializer();
+
+ int numberOfInputs = configuration.getNumberOfInputs();
+ if (numberOfInputs > 0) {
+
+ if (numberOfInputs < 2) {
+
+ inputs = new MutableRecordReader<IOReadableWritable>(streamVertex);
+
+ } else {
+ MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
+
+ for (int i = 0; i < numberOfInputs; i++) {
+ recordReaders[i] = new MutableRecordReader<IOReadableWritable>(streamVertex);
+ }
+ inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
+ }
+
+ inputIter = createInputIterator();
+ }
+ }
+
+ private void setDeserializer() {
+ TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
+ if (inTupleTypeInfo != null) {
+ inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
+ }
+ }
+
+ private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs,
+ inputSerializer);
+ return iter;
+ }
+
+ protected static <T> MutableObjectIterator<StreamRecord<T>> staticCreateInputIterator(
+ MutableReader<?> inputReader, TypeSerializer<?> serializer) {
+
+ // generic data type serialization
+ @SuppressWarnings("unchecked")
+ MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final MutableObjectIterator<StreamRecord<T>> iter = new ReaderIterator(reader, serializer);
+ return iter;
+ }
+
+ public StreamRecordSerializer<IN> getInputSerializer() {
+ return inputSerializer;
+ }
+
+ public MutableObjectIterator<StreamRecord<IN>> getInputIter() {
+ return inputIter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
new file mode 100644
index 0000000..d3f75dd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.collector.StreamCollector;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.StreamRecordWriter;
+import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutputHandler<OUT> {
+ private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
+
+ private StreamVertex<?,OUT> streamVertex;
+ private StreamConfig configuration;
+
+ private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
+ private StreamCollector<OUT> collector;
+ private long bufferTimeout;
+
+ TypeInformation<OUT> outTypeInfo = null;
+ StreamRecordSerializer<OUT> outSerializer = null;
+ SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
+
+ public OutputHandler(StreamVertex<?,OUT> streamComponent) {
+ this.streamVertex = streamComponent;
+ this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+ this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
+
+ try {
+ setConfigOutputs();
+ } catch (StreamVertexException e) {
+ throw new StreamVertexException("Cannot register outputs for "
+ + streamComponent.getClass().getSimpleName(), e);
+ }
+ }
+
+ public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() {
+ return outputs;
+ }
+
+ private void setConfigOutputs() {
+ setSerializers();
+ setCollector();
+
+ int numberOfOutputs = configuration.getNumberOfOutputs();
+ bufferTimeout = configuration.getBufferTimeout();
+
+ for (int i = 0; i < numberOfOutputs; i++) {
+ setPartitioner(i, outputs);
+ }
+ }
+
+ private StreamCollector<OUT> setCollector() {
+ if (streamVertex.configuration.getDirectedEmit()) {
+ OutputSelector<OUT> outputSelector = streamVertex.configuration.getOutputSelector();
+
+ collector = new DirectedStreamCollector<OUT>(streamVertex.getInstanceID(),
+ outSerializationDelegate, outputSelector);
+ } else {
+ collector = new StreamCollector<OUT>(streamVertex.getInstanceID(),
+ outSerializationDelegate);
+ }
+ return collector;
+ }
+
+ public StreamCollector<OUT> getCollector() {
+ return collector;
+ }
+
+ void setSerializers() {
+ outTypeInfo = configuration.getTypeInfoOut1();
+ if (outTypeInfo != null) {
+ outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
+ outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
+ outSerializationDelegate.setInstance(outSerializer.createInstance());
+ }
+ }
+
+ void setPartitioner(int outputNumber,
+ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
+ StreamPartitioner<OUT> outputPartitioner = null;
+
+ try {
+ outputPartitioner = configuration.getPartitioner(outputNumber);
+
+ } catch (Exception e) {
+ throw new StreamVertexException("Cannot deserialize partitioner for "
+ + streamVertex.getName() + " with " + outputNumber + " outputs", e);
+ }
+
+ RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
+
+ if (bufferTimeout > 0) {
+ output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
+ streamVertex, outputPartitioner, bufferTimeout);
+ } else {
+ output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
+ outputPartitioner);
+ }
+
+ outputs.add(output);
+ List<String> outputName = configuration.getOutputName(outputNumber);
+ boolean isSelectAllOutput = configuration.getSelectAll(outputNumber);
+
+ if (collector != null) {
+ collector.addOutput(output, outputName, isSelectAllOutput);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Partitioner set: {} with {} outputs", outputPartitioner.getClass()
+ .getSimpleName(), outputNumber);
+ }
+ }
+
+ public void flushOutputs() throws IOException, InterruptedException {
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+ output.flush();
+ }
+ }
+
+ public void initializeOutputSerializers() {
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) {
+ output.initializeSerializers();
+ }
+ }
+
+ long startTime;
+
+ public void invokeUserFunction(String componentTypeName, StreamInvokable<?,OUT> userInvokable)
+ throws IOException, InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} {} invoked with instance id {}", componentTypeName,
+ streamVertex.getName(), streamVertex.getInstanceID());
+ }
+
+ initializeOutputSerializers();
+
+ try {
+ streamVertex.invokeUserFunction(userInvokable);
+ } catch (Exception e) {
+ flushOutputs();
+ throw new RuntimeException(e);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} {} invoke finished instance id {}", componentTypeName,
+ streamVertex.getName(), streamVertex.getInstanceID());
+ }
+
+ flushOutputs();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
new file mode 100755
index 0000000..4dfecb1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.io.BlockingQueueBroker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
+
+ private OutputHandler<OUT> outputHandler;
+
+ private static int numSources;
+ private String iterationId;
+ @SuppressWarnings("rawtypes")
+ private BlockingQueue<StreamRecord> dataChannel;
+ private long iterationWaitTime;
+ private boolean shouldWait;
+
+ @SuppressWarnings("rawtypes")
+ public StreamIterationHead() {
+ numSources = newVertex();
+ instanceID = numSources;
+ dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
+ }
+
+ @Override
+ public void setInputsOutputs() {
+ outputHandler = new OutputHandler<OUT>(this);
+
+ iterationId = configuration.getIterationId();
+ iterationWaitTime = configuration.getIterationWaitTime();
+ shouldWait = iterationWaitTime > 0;
+
+ try {
+ BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
+ } catch (Exception e) {
+
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void invoke() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
+ }
+
+ outputHandler.initializeOutputSerializers();
+
+ StreamRecord<OUT> nextRecord;
+
+ while (true) {
+ if (shouldWait) {
+ nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
+ } else {
+ nextRecord = dataChannel.take();
+ }
+ if (nextRecord == null) {
+ break;
+ }
+ for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler
+ .getOutputs()) {
+ outputHandler.outSerializationDelegate.setInstance(nextRecord);
+ output.emit(outputHandler.outSerializationDelegate);
+ }
+ }
+
+ outputHandler.flushOutputs();
+ }
+
+ @Override
+ protected void setInvokable() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
new file mode 100755
index 0000000..b603686
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.io.BlockingQueueBroker;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN,IN> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
+
+ private InputHandler<IN> inputHandler;
+
+ private String iterationId;
+ @SuppressWarnings("rawtypes")
+ private BlockingQueue<StreamRecord> dataChannel;
+ private long iterationWaitTime;
+ private boolean shouldWait;
+
+ public StreamIterationTail() {
+ }
+
+ @Override
+ public void setInputsOutputs() {
+ try {
+ inputHandler = new InputHandler<IN>(this);
+
+ iterationId = configuration.getIterationId();
+ iterationWaitTime = configuration.getIterationWaitTime();
+ shouldWait = iterationWaitTime > 0;
+ dataChannel = BlockingQueueBroker.instance().get(iterationId);
+ } catch (Exception e) {
+ throw new StreamVertexException(String.format(
+ "Cannot register inputs of StreamIterationSink %s", iterationId), e);
+ }
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SINK {} invoked", getName());
+ }
+
+ forwardRecords();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SINK {} invoke finished", getName());
+ }
+ }
+
+ protected void forwardRecords() throws Exception {
+ StreamRecord<IN> reuse = inputHandler.getInputSerializer().createInstance();
+ while ((reuse = inputHandler.getInputIter().next(reuse)) != null) {
+ if (!pushToQueue(reuse)) {
+ break;
+ }
+ // TODO: Fix object reuse for iteration
+ reuse = inputHandler.getInputSerializer().createInstance();
+ }
+ }
+
+ private boolean pushToQueue(StreamRecord<IN> record) {
+ try {
+ if (shouldWait) {
+ return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
+ } else {
+ dataChannel.put(record);
+ return true;
+ }
+ } catch (InterruptedException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
+ StringUtils.stringifyException(e));
+ }
+ return false;
+ }
+ }
+
+ @Override
+ protected void setInvokable() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
new file mode 100644
index 0000000..d05339a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+
+public class StreamVertex<IN, OUT> extends AbstractInvokable {
+
+ private static int numTasks;
+
+ protected StreamConfig configuration;
+ protected int instanceID;
+ protected String name;
+ private static int numVertices = 0;
+ protected boolean isMutable;
+ protected Object function;
+ protected String functionName;
+
+ private InputHandler<IN> inputHandler;
+ private OutputHandler<OUT> outputHandler;
+ private StreamInvokable<IN, OUT> userInvokable;
+
+ public StreamVertex() {
+ userInvokable = null;
+ numTasks = newVertex();
+ instanceID = numTasks;
+ }
+
+ protected static int newVertex() {
+ numVertices++;
+ return numVertices;
+ }
+
+ @Override
+ public void registerInputOutput() {
+ initialize();
+ setInputsOutputs();
+ setInvokable();
+ }
+
+ protected void initialize() {
+ this.configuration = new StreamConfig(getTaskConfiguration());
+ this.name = configuration.getVertexName();
+ this.isMutable = configuration.getMutability();
+ this.functionName = configuration.getFunctionName();
+ this.function = configuration.getFunction();
+ }
+
+ protected <T> void invokeUserFunction(StreamInvokable<?,T> userInvokable) throws Exception {
+ userInvokable.open(getTaskConfiguration());
+ userInvokable.invoke();
+ userInvokable.close();
+ }
+
+
+ public void setInputsOutputs() {
+ inputHandler = new InputHandler<IN>(this);
+ outputHandler = new OutputHandler<OUT>(this);
+ }
+
+ protected void setInvokable() {
+ userInvokable = configuration.getUserInvokable();
+ userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
+ inputHandler.getInputSerializer(), isMutable);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getInstanceID() {
+ return instanceID;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ outputHandler.invokeUserFunction("TASK", userInvokable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
new file mode 100644
index 0000000..ed8b91e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertexException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+/**
+ * An exception that is thrown by the stream verices when encountering an
+ * illegal condition.
+ */
+public class StreamVertexException extends RuntimeException {
+
+ /**
+ * Serial version UID for serialization interoperability.
+ */
+ private static final long serialVersionUID = 8392043527067472439L;
+
+ /**
+ * Creates a compiler exception with no message and no cause.
+ */
+ public StreamVertexException() {
+ }
+
+ /**
+ * Creates a compiler exception with the given message and no cause.
+ *
+ * @param message
+ * The message for the exception.
+ */
+ public StreamVertexException(String message) {
+ super(message);
+ }
+
+ /**
+ * Creates a compiler exception with the given cause and no message.
+ *
+ * @param cause
+ * The <tt>Throwable</tt> that caused this exception.
+ */
+ public StreamVertexException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates a compiler exception with the given message and cause.
+ *
+ * @param message
+ * The message for the exception.
+ * @param cause
+ * The <tt>Throwable</tt> that caused this exception.
+ */
+ public StreamVertexException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index ae52e67..535e109 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -18,13 +18,13 @@
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertArrayEquals;
-
+
import java.util.ArrayList;
-
+
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
import org.apache.flink.streaming.util.MockRecordWriterFactory;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
deleted file mode 100755
index 9c5c43f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.operators.DataSourceTask;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-
-public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
-
- public ArrayList<Integer> emittedRecords;
-
- public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
- super(inputBase);
- }
-
- public boolean initList() {
- emittedRecords = new ArrayList<Integer>();
- return true;
- }
-
- @Override
- public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
- emittedRecords.add(record.getInstance().getObject().f0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
deleted file mode 100644
index cc341dc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.streamcomponent;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class StreamComponentTest {
-
- private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
-
- public static class MySource implements SourceFunction<Tuple1<Integer>> {
- private static final long serialVersionUID = 1L;
-
- private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
-
- @Override
- public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
- for (int i = 0; i < 10; i++) {
- tuple.f0 = i;
- collector.collect(tuple);
- }
- }
- }
-
- public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
- Integer i = value.f0;
- return new Tuple2<Integer, Integer>(i, i + 1);
- }
- }
-
- public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(Tuple2<Integer, Integer> tuple) {
- Integer k = tuple.getField(0);
- Integer v = tuple.getField(1);
- data.put(k, v);
- }
- }
-
- @SuppressWarnings("unused")
- private static final int PARALLELISM = 1;
- private static final int SOURCE_PARALELISM = 1;
- private static final long MEMORYSIZE = 32;
-
-// @Test
- public void wrongJobGraph() {
- LocalStreamEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(SOURCE_PARALELISM);
-
- try {
- env.execute();
- fail();
- } catch (Exception e) {
- }
-
- env.fromCollection(Arrays.asList("a", "b"));
-
- try {
- env.execute();
- fail();
- } catch (Exception e) {
- }
-
- try {
- env.fromCollection(null);
- fail();
- } catch (NullPointerException e) {
- }
-
- try {
- env.fromElements();
- fail();
- } catch (IllegalArgumentException e) {
- }
-
- try {
- env.generateSequence(-10, -30);
- fail();
- } catch (IllegalArgumentException e) {
- }
-
- try {
- env.setBufferTimeout(-10);
- fail();
- } catch (IllegalArgumentException e) {
- }
-
- try {
- env.setExecutionParallelism(-10);
- fail();
- } catch (IllegalArgumentException e) {
- }
-
- try {
- env.readTextFile("random/path/that/is/not/valid");
- fail();
- } catch (IllegalArgumentException e) {
- }
- }
-
- private static class CoMap implements CoMapFunction<String, Long, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(String value) {
- return value;
- }
-
- @Override
- public String map2(Long value) {
- return value.toString();
- }
- }
-
- static HashSet<String> resultSet;
-
- private static class SetSink implements SinkFunction<String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(String value) {
- resultSet.add(value);
- }
- }
-
- @Test
- public void coTest() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(SOURCE_PARALELISM);
-
- DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
- DataStream<Long> generatedSequence = env.generateSequence(0, 3);
-
- fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
-
- resultSet = new HashSet<String>();
- env.execute();
-
- HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
- "2", "3"));
- assertEquals(expectedSet, resultSet);
- }
-
- @Test
- public void runStream() throws Exception {
- LocalStreamEnvironment env = StreamExecutionEnvironment
- .createLocalEnvironment(SOURCE_PARALELISM);
-
- env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
-
- env.executeTest(MEMORYSIZE);
- assertEquals(10, data.keySet().size());
-
- for (Integer k : data.keySet()) {
- assertEquals((Integer) (k + 1), data.get(k));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
new file mode 100755
index 0000000..e91dc8b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/MockRecordWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import java.util.ArrayList;
+
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.io.network.api.RecordWriter;
+
+public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
+
+ public ArrayList<Integer> emittedRecords;
+
+ public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
+ super(inputBase);
+ }
+
+ public boolean initList() {
+ emittedRecords = new ArrayList<Integer>();
+ return true;
+ }
+
+ @Override
+ public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
+ emittedRecords.add(record.getInstance().getObject().f0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
new file mode 100644
index 0000000..f470c76
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class StreamVertexTest {
+
+ private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
+
+ public static class MySource implements SourceFunction<Tuple1<Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
+
+ @Override
+ public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ tuple.f0 = i;
+ collector.collect(tuple);
+ }
+ }
+ }
+
+ public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Integer> map(Tuple1<Integer> value) throws Exception {
+ Integer i = value.f0;
+ return new Tuple2<Integer, Integer>(i, i + 1);
+ }
+ }
+
+ public static class MySink implements SinkFunction<Tuple2<Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple2<Integer, Integer> tuple) {
+ Integer k = tuple.getField(0);
+ Integer v = tuple.getField(1);
+ data.put(k, v);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static final int PARALLELISM = 1;
+ private static final int SOURCE_PARALELISM = 1;
+ private static final long MEMORYSIZE = 32;
+
+ @Test
+ public void wrongJobGraph() {
+ LocalStreamEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(SOURCE_PARALELISM);
+
+
+ try {
+ env.fromCollection(null);
+ fail();
+ } catch (NullPointerException e) {
+ }
+
+ try {
+ env.fromElements();
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ env.generateSequence(-10, -30);
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ env.setBufferTimeout(-10);
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ env.setExecutionParallelism(-10);
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+
+ try {
+ env.readTextFile("random/path/that/is/not/valid");
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ private static class CoMap implements CoMapFunction<String, Long, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map1(String value) {
+ return value;
+ }
+
+ @Override
+ public String map2(Long value) {
+ return value.toString();
+ }
+ }
+
+ static HashSet<String> resultSet;
+
+ private static class SetSink implements SinkFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(String value) {
+ resultSet.add(value);
+ }
+ }
+
+ @Test
+ public void coTest() throws Exception {
+ LocalStreamEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(SOURCE_PARALELISM);
+
+ DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
+ DataStream<Long> generatedSequence = env.generateSequence(0, 3);
+
+ fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
+
+ resultSet = new HashSet<String>();
+ env.execute();
+
+ HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
+ "2", "3"));
+ assertEquals(expectedSet, resultSet);
+ }
+
+ @Test
+ public void runStream() throws Exception {
+ LocalStreamEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(SOURCE_PARALELISM);
+
+ env.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
+
+ env.executeTest(MEMORYSIZE);
+ assertEquals(10, data.keySet().size());
+
+ for (Integer k : data.keySet()) {
+ assertEquals((Integer) (k + 1), data.get(k));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
index 07314ea..c06f53a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
@@ -88,7 +88,7 @@ public class MockInvokable<IN, OUT> {
return iterator;
}
- public static <IN, OUT> List<OUT> createAndExecute(StreamOperatorInvokable<IN, OUT> invokable, List<IN> inputs) {
+ public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, List<IN> inputs) {
MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
index 3baa08d..88673a3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
@@ -21,7 +21,7 @@ import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
+import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
import org.mockito.Mockito;
public class MockRecordWriterFactory {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/73371101/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index 4232398..211daf6 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -23,8 +23,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WindowJoinLocal {
- private static final int PARALLELISM = 1;
- private static final int SOURCE_PARALLELISM = 1;
+ private static final int PARALLELISM = 4;
+ private static final int SOURCE_PARALLELISM = 2;
// This example will join two streams with a sliding window. One which emits
// people's grades and one which emits people's salaries.