You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/05/19 16:38:01 UTC
[4/7] flink git commit: [FLINK-1977] Rework Stream Operators to
always be push based
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7e09f22..a412e05 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -33,38 +33,31 @@ import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.ChainableStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.io.CoReaderIterator;
-import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
+public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements
OperatorStateCarrier<LocalStateHandle>, CheckpointedOperator, CheckpointCommittingOperator {
private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
private final Object checkpointLock = new Object();
-
- private static int numTasks;
protected StreamConfig configuration;
- protected int instanceID;
- private static int numVertices = 0;
- private InputHandler<IN> inputHandler;
protected OutputHandler<OUT> outputHandler;
- private StreamOperator<IN, OUT> streamOperator;
- private boolean chained;
+
+ protected O streamOperator;
+
+ protected boolean hasChainedOperators;
+
protected volatile boolean isRunning = false;
- private StreamingRuntimeContext context;
+ protected StreamingRuntimeContext context;
protected ClassLoader userClassLoader;
@@ -72,106 +65,40 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
public StreamTask() {
streamOperator = null;
- numTasks = newTask();
- instanceID = numTasks;
superstepListener = new SuperstepEventListener();
}
- protected static int newTask() {
- numVertices++;
- return numVertices;
- }
-
@Override
public void registerInputOutput() {
- initialize();
- setInputsOutputs();
- setOperator();
- }
-
- protected void initialize() {
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
this.context = createRuntimeContext(getEnvironment().getTaskName());
- }
- public void setInputsOutputs() {
- inputHandler = new InputHandler<IN>(this);
outputHandler = new OutputHandler<OUT>(this);
- chained = !outputHandler.getChainedOperators().isEmpty();
- }
- protected void setOperator() {
streamOperator = configuration.getStreamOperator(userClassLoader);
- streamOperator.setup(this);
+ if (streamOperator != null) {
+ // IterationHead and IterationTail don't have an Operator...
+ streamOperator.setup(outputHandler.getOutput(), this.context);
+ }
+
+ hasChainedOperators = !outputHandler.getChainedOperators().isEmpty();
}
public String getName() {
return getEnvironment().getTaskName();
}
- public int getInstanceID() {
- return instanceID;
- }
-
public StreamingRuntimeContext createRuntimeContext(String taskName) {
Environment env = getEnvironment();
return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(),
getExecutionConfig());
}
- @Override
- public void invoke() throws Exception {
- this.isRunning = true;
-
- boolean operatorOpen = false;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
- }
-
- try {
- streamOperator.setRuntimeContext(context);
-
- operatorOpen = true;
- openOperator();
-
- streamOperator.run();
-
- closeOperator();
- operatorOpen = false;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
- }
-
- } catch (Exception e) {
-
- if (operatorOpen) {
- try {
- closeOperator();
- } catch (Throwable t) {
- }
- }
-
- if (LOG.isErrorEnabled()) {
- LOG.error("StreamOperator failed due to: {}", StringUtils.stringifyException(e));
- }
- throw e;
- } finally {
- this.isRunning = false;
- // Cleanup
- outputHandler.flushOutputs();
- clearBuffers();
- }
-
- }
-
protected void openOperator() throws Exception {
streamOperator.open(getTaskConfiguration());
- for (ChainableStreamOperator<?, ?> operator : outputHandler.getChainedOperators()) {
- operator.setRuntimeContext(context);
+ for (OneInputStreamOperator<?, ?> operator : outputHandler.chainedOperators) {
operator.open(getTaskConfiguration());
}
}
@@ -179,8 +106,10 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
protected void closeOperator() throws Exception {
streamOperator.close();
- for (ChainableStreamOperator<?, ?> operator : outputHandler.getChainedOperators()) {
- operator.close();
+ // We need to close them first to last, since upstream operators in the chain might emit
+ // elements in their close methods.
+ for (int i = outputHandler.chainedOperators.size()-1; i >= 0; i--) {
+ outputHandler.chainedOperators.get(i).close();
}
}
@@ -188,61 +117,11 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
if (outputHandler != null) {
outputHandler.clearWriters();
}
- if (inputHandler != null) {
- inputHandler.clearReaders();
- }
}
@Override
public void cancel() {
- if (streamOperator != null) {
- streamOperator.cancel();
- }
- }
-
- @Override
- public StreamConfig getConfig() {
- return configuration;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> MutableObjectIterator<X> getInput(int index) {
- if (index == 0) {
- return (MutableObjectIterator<X>) inputHandler.getInputIter();
- } else {
- throw new IllegalArgumentException("There is only 1 input");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
- if (index == 0) {
- return (IndexedReaderIterator<X>) inputHandler.getInputIter();
- } else {
- throw new IllegalArgumentException("There is only 1 input");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
- if (index == 0) {
- return (StreamRecordSerializer<X>) inputHandler.getInputSerializer();
- } else {
- throw new IllegalArgumentException("There is only 1 input");
- }
- }
-
- @Override
- public Collector<OUT> getOutputCollector() {
- return outputHandler.getCollector();
- }
-
- @Override
- public <X, Y> CoReaderIterator<X, Y> getCoReader() {
- throw new IllegalArgumentException("CoReader not available");
+ this.isRunning = false;
}
public EventListener<TaskEvent> getSuperstepListener() {
@@ -262,34 +141,39 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
// loading the state described by the handle from the backup store
Serializable state = stateHandle.getState();
- if (chained) {
+ if (hasChainedOperators) {
@SuppressWarnings("unchecked")
List<Serializable> chainedStates = (List<Serializable>) state;
Serializable headState = chainedStates.get(0);
if (headState != null) {
- streamOperator.restoreInitialState(headState);
+ if (streamOperator instanceof StatefulStreamOperator) {
+ ((StatefulStreamOperator) streamOperator).restoreInitialState(headState);
+ }
}
for (int i = 1; i < chainedStates.size(); i++) {
Serializable chainedState = chainedStates.get(i);
if (chainedState != null) {
- outputHandler.getChainedOperators().get(i - 1).restoreInitialState(chainedState);
+ StreamOperator chainedOperator = outputHandler.getChainedOperators().get(i - 1);
+ if (chainedOperator instanceof StatefulStreamOperator) {
+ ((StatefulStreamOperator) chainedOperator).restoreInitialState(chainedState);
+ }
+
}
}
} else {
- streamOperator.restoreInitialState(state);
+ if (streamOperator instanceof StatefulStreamOperator) {
+ ((StatefulStreamOperator) streamOperator).restoreInitialState(state);
+ }
+
}
}
/**
* This method is either called directly by the checkpoint coordinator, or called
* when all incoming channels have reported a barrier
- *
- * @param checkpointId
- * @param timestamp
- * @throws Exception
*/
@Override
public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
@@ -302,18 +186,24 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
// first draw the state that should go into checkpoint
LocalStateHandle state;
try {
- Serializable userState = streamOperator.getStateSnapshotFromFunction(
- checkpointId, timestamp);
-
- if (chained) {
+
+ Serializable userState = null;
+
+ if (streamOperator instanceof StatefulStreamOperator) {
+ userState = ((StatefulStreamOperator) streamOperator).getStateSnapshotFromFunction(checkpointId, timestamp);
+ }
+
+
+ if (hasChainedOperators) {
// We construct a list of states for chained tasks
List<Serializable> chainedStates = new ArrayList<Serializable>();
chainedStates.add(userState);
- for (StreamOperator<?, ?> chainedOperator : outputHandler.getChainedOperators()) {
- chainedStates.add(chainedOperator.getStateSnapshotFromFunction(
- checkpointId, timestamp));
+ for (OneInputStreamOperator<?, ?> chainedOperator : outputHandler.getChainedOperators()) {
+ if (chainedOperator instanceof StatefulStreamOperator) {
+ chainedStates.add(((StatefulStreamOperator) chainedOperator).getStateSnapshotFromFunction(checkpointId, timestamp));
+ }
}
userState = CollectionUtils.exists(chainedStates,
@@ -350,10 +240,15 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
public void confirmCheckpoint(long checkpointId, long timestamp) throws Exception {
// we do nothing here so far. this should call commit on the source function, for example
synchronized (checkpointLock) {
- streamOperator.confirmCheckpointCompleted(checkpointId, timestamp);
- if (chained) {
- for (StreamOperator<?, ?> op : outputHandler.getChainedOperators()) {
- op.confirmCheckpointCompleted(checkpointId, timestamp);
+ if (streamOperator instanceof StatefulStreamOperator) {
+ ((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp);
+ }
+
+ if (hasChainedOperators) {
+ for (OneInputStreamOperator<?, ?> chainedOperator : outputHandler.getChainedOperators()) {
+ if (chainedOperator instanceof StatefulStreamOperator) {
+ ((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskContext.java
deleted file mode 100644
index ba447d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.io.CoReaderIterator;
-import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public interface StreamTaskContext<OUT> {
-
- StreamConfig getConfig();
-
- ClassLoader getUserCodeClassLoader();
-
- <X> MutableObjectIterator<X> getInput(int index);
-
- <X> IndexedReaderIterator<X> getIndexedInput(int index);
-
- <X> StreamRecordSerializer<X> getInputSerializer(int index);
-
- Collector<OUT> getOutputCollector();
-
- <X, Y> CoReaderIterator<X, Y> getCoReader();
-
- ExecutionConfig getExecutionConfig();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
new file mode 100644
index 0000000..1f7a9b6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.io.CoRecordReader;
+import org.apache.flink.streaming.runtime.io.InputGateFactory;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
+
+ protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
+ protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
+
+ CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
+ CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
+
+ @Override
+ public void invoke() throws Exception {
+ this.isRunning = true;
+
+ boolean operatorOpen = false;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Task {} invoked", getName());
+ }
+
+ try {
+
+ openOperator();
+ operatorOpen = true;
+
+ int next;
+ StreamRecord<IN1> reuse1 = inputDeserializer1.createInstance();
+ StreamRecord<IN2> reuse2 = inputDeserializer2.createInstance();
+
+ while (isRunning) {
+ try {
+ next = coIter.next(reuse1, reuse2);
+ } catch (IOException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record.", e);
+ } else {
+ // Task already cancelled do nothing
+ next = 0;
+ }
+ } catch (IllegalStateException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record.", e);
+ } else {
+ // Task already cancelled do nothing
+ next = 0;
+ }
+ }
+
+ if (next == 0) {
+ break;
+ } else if (next == 1) {
+ streamOperator.processElement1(reuse1.getObject());
+ reuse1 = inputDeserializer1.createInstance();
+ } else {
+ streamOperator.processElement2(reuse2.getObject());
+ reuse2 = inputDeserializer2.createInstance();
+ }
+ }
+
+ closeOperator();
+ operatorOpen = false;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Task {} invocation finished", getName());
+ }
+
+ } catch (Exception e) {
+
+ if (operatorOpen) {
+ try {
+ closeOperator();
+ } catch (Throwable t) {
+ LOG.info("Caught exception while closing operator.", e);
+ }
+ }
+
+ if (LOG.isErrorEnabled()) {
+ LOG.error("StreamOperator failed. ", e);
+ }
+ throw e;
+ } finally {
+ this.isRunning = false;
+ // Cleanup
+ outputHandler.flushOutputs();
+ clearBuffers();
+ }
+
+ }
+
+ @Override
+ public void registerInputOutput() {
+ super.registerInputOutput();
+
+ inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+ inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+
+ int numberOfInputs = configuration.getNumberOfInputs();
+
+ ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+ ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+
+ List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
+
+ for (int i = 0; i < numberOfInputs; i++) {
+ int inputType = inEdges.get(i).getTypeNumber();
+ InputGate reader = getEnvironment().getInputGate(i);
+ switch (inputType) {
+ case 1:
+ inputList1.add(reader);
+ break;
+ case 2:
+ inputList2.add(reader);
+ break;
+ default:
+ throw new RuntimeException("Invalid input type number: " + inputType);
+ }
+ }
+
+ final InputGate reader1 = InputGateFactory.createInputGate(inputList1);
+ final InputGate reader2 = InputGateFactory.createInputGate(inputList2);
+
+ coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
+ reader1, reader2);
+ coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
+ inputDeserializer1, inputDeserializer2);
+ }
+
+ @Override
+ public void clearBuffers() throws IOException {
+ super.clearBuffers();
+ coReader.clearBuffers();
+ coReader.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
new file mode 100644
index 0000000..1bfb13a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.util.MockSource;
+import org.junit.Test;
+
+public class SourceFunctionTest {
+
+ @Test
+ public void fromElementsTest() throws Exception {
+ List<Integer> expectedList = Arrays.asList(1, 2, 3);
+ List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1,
+ 2, 3));
+ assertEquals(expectedList, actualList);
+ }
+
+ @Test
+ public void fromCollectionTest() throws Exception {
+ List<Integer> expectedList = Arrays.asList(1, 2, 3);
+ List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(
+ Arrays.asList(1, 2, 3)));
+ assertEquals(expectedList, actualList);
+ }
+
+ @Test
+ public void socketTextStreamTest() throws Exception {
+ // TODO: does not work because we cannot set the internal socket anymore
+// List<String> expectedList = Arrays.asList("a", "b", "c");
+// List<String> actualList = new ArrayList<String>();
+//
+// byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' };
+//
+// Socket socket = mock(Socket.class);
+// when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
+// when(socket.isClosed()).thenReturn(false);
+// when(socket.isConnected()).thenReturn(true);
+//
+// SocketTextStreamFunction source = new SocketTextStreamFunction("", 0, '\n', 0);
+// source.open(new Configuration());
+// while (!source.reachedEnd()) {
+// actualList.add(source.next());
+// }
+// assertEquals(expectedList, actualList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
deleted file mode 100644
index e72f2d9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
-import org.apache.flink.streaming.util.MockCollector;
-import org.apache.flink.streaming.util.MockSource;
-import org.junit.Test;
-
-public class SourceTest {
-
- @Test
- public void fromElementsTest() {
- List<Integer> expectedList = Arrays.asList(1, 2, 3);
- List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1,
- 2, 3));
- assertEquals(expectedList, actualList);
- }
-
- @Test
- public void fromCollectionTest() {
- List<Integer> expectedList = Arrays.asList(1, 2, 3);
- List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(
- Arrays.asList(1, 2, 3)));
- assertEquals(expectedList, actualList);
- }
-
- @Test
- public void socketTextStreamTest() throws Exception {
- List<String> expectedList = Arrays.asList("a", "b", "c");
- List<String> actualList = new ArrayList<String>();
-
- byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' };
-
- Socket socket = mock(Socket.class);
- when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
- when(socket.isClosed()).thenReturn(false);
- when(socket.isConnected()).thenReturn(true);
-
- new SocketTextStreamFunction("", 0, '\n', 0).streamFromSocket(new MockCollector<String>(
- actualList), socket);
- assertEquals(expectedList, actualList);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 35cbaba..ca6057c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -116,13 +116,15 @@ public class TypeFillTest {
private class TestSource<T> implements SourceFunction<T> {
- @Override
- public void run(Collector<T> collector) throws Exception {
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
}
@Override
- public void cancel() {
+ public T next() throws Exception {
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 32da578..118b23d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -36,7 +36,7 @@ public class StreamCollectorTest {
null);
sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
- Collector<Tuple1<Integer>> collector = new StreamOutput<Tuple1<Integer>>(recWriter, 2, sd);
+ Collector<Tuple1<Integer>> collector = new StreamOutput<Tuple1<Integer>>(recWriter, sd);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 67c1387..cfb21f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -477,32 +477,36 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
private static class PojoSource implements SourceFunction<OuterPojo> {
private static final long serialVersionUID = 1L;
+ long cnt = 0;
+
@Override
- public void run(Collector<OuterPojo> collector) throws Exception {
- for (long i = 0; i < 20; i++) {
- collector.collect(new OuterPojo(new InnerPojo(i / 2, "water_melon-b"), 2L));
- }
+ public boolean reachedEnd() throws Exception {
+ return cnt >= 20;
}
@Override
- public void cancel() {
- // no cleanup needed
+ public OuterPojo next() throws Exception {
+ OuterPojo result = new OuterPojo(new InnerPojo(cnt / 2, "water_melon-b"), 2L);
+ cnt++;
+ return result;
}
}
private static class TupleSource implements SourceFunction<Tuple2<Long, Tuple2<String, Long>>> {
private static final long serialVersionUID = 1L;
+ int cnt = 0;
+
@Override
- public void run(Collector<Tuple2<Long, Tuple2<String, Long>>> collector) throws Exception {
- for (int i = 0; i < 20; i++) {
- collector.collect(new Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L)));
- }
+ public boolean reachedEnd() throws Exception {
+ return cnt >= 20;
}
@Override
- public void cancel() {
- // no cleanup needed
+ public Tuple2<Long, Tuple2<String, Long>> next() throws Exception {
+ Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L));
+ cnt++;
+ return result;
}
}
@@ -605,20 +609,20 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
private static class RectangleSource implements SourceFunction<RectangleClass> {
private static final long serialVersionUID = 1L;
+ RectangleClass rectangle = new RectangleClass(100, 100);
+ int cnt = 0;
@Override
- public void run(Collector<RectangleClass> collector) throws Exception {
- RectangleClass rectangle = new RectangleClass(100, 100);
-
- for (int i = 0; i < 100; i++) {
- collector.collect(rectangle);
- rectangle = rectangle.next();
- }
+ public boolean reachedEnd() throws Exception {
+ return cnt >= 100;
}
@Override
- public void cancel() {
- // no cleanup needed
+ public RectangleClass next() throws Exception {
+ RectangleClass result = rectangle;
+ cnt++;
+ rectangle = rectangle.next();
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
index faaadbc..b9e9717 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
index ae08825..dc6d0d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattenerTest.java
@@ -22,8 +22,7 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
@@ -32,7 +31,7 @@ public class WindowFlattenerTest {
@Test
public void test() {
- StreamOperator<StreamWindow<Integer>, Integer> flattener = new WindowFlattener<Integer>();
+ OneInputStreamOperator<StreamWindow<Integer>, Integer> flattener = new WindowFlattener<Integer>();
StreamWindow<Integer> w1 = StreamWindow.fromElements(1, 2, 3);
StreamWindow<Integer> w2 = new StreamWindow<Integer>();
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
index 944967a..3b54069 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowFolderTest.java
@@ -23,8 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.WindowFolder;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
@@ -33,7 +32,7 @@ public class WindowFolderTest {
@Test
public void test() {
- StreamOperator<StreamWindow<Integer>, StreamWindow<String>> windowReducer = new WindowFolder<Integer,String>(
+ OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<String>> windowReducer = new WindowFolder<Integer,String>(
new FoldFunction<Integer, String>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
index 0593c55..cdf39fe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
@@ -157,36 +158,42 @@ public class WindowIntegrationTest implements Serializable {
.getDiscretizedStream().addSink(new TestSink12());
DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
+
+ private int i = 1;
@Override
- public void run(Collector<Integer> collector) throws Exception {
- for (int i = 1; i <= 10; i++) {
- collector.collect(i);
- }
+ public boolean reachedEnd() throws Exception {
+ return i > 10;
}
@Override
- public void cancel() {
+ public Integer next() throws Exception {
+ return i++;
}
+
});
DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
+ private int i = 1;
- private static final long serialVersionUID = 1L;
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ i = 1 + getRuntimeContext().getIndexOfThisSubtask();
+ }
@Override
- public void run(Collector<Integer> collector) throws Exception {
- for (int i = 1; i <= 11; i++) {
- if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
- collector.collect(i);
- }
- }
+ public boolean reachedEnd() throws Exception {
+ return i > 11;
}
@Override
- public void cancel() {
+ public Integer next() throws Exception {
+ int result = i;
+ i += 2;
+ return result;
}
+
});
source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
index f220a67..9836a99 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMapperTest.java
@@ -23,8 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.WindowMapper;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.util.Collector;
@@ -34,7 +33,7 @@ public class WindowMapperTest {
@Test
public void test() {
- StreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowMapper = new WindowMapper<Integer, Integer>(
+ OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowMapper = new WindowMapper<Integer, Integer>(
new WindowMapFunction<Integer, Integer>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
index 1d1aa56..43e3785 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowMergerTest.java
@@ -24,8 +24,7 @@ import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
@@ -34,7 +33,7 @@ public class WindowMergerTest {
@Test
public void test() throws Exception {
- StreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowMerger = new WindowMerger<Integer>();
+ OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowMerger = new WindowMerger<Integer>();
StreamWindow<Integer> w1 = new StreamWindow<Integer>();
StreamWindow<Integer> w2 = StreamWindow.fromElements(1, 2, 3, 4);
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
index c06a589..7521a2b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitionerTest.java
@@ -23,8 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
@@ -33,10 +32,10 @@ public class WindowPartitionerTest {
@Test
public void test() throws Exception {
- StreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> splitPartitioner = new WindowPartitioner<Integer>(
+ OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> splitPartitioner = new WindowPartitioner<Integer>(
2);
- StreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> gbPartitioner = new WindowPartitioner<Integer>(
+ OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> gbPartitioner = new WindowPartitioner<Integer>(
new MyKey());
StreamWindow<Integer> w1 = new StreamWindow<Integer>();
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
index 6e1afff..b78a5ba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowReducerTest.java
@@ -23,8 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.WindowReducer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
@@ -33,7 +32,7 @@ public class WindowReducerTest {
@Test
public void test() {
- StreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowReducer = new WindowReducer<Integer>(
+ OneInputStreamOperator<StreamWindow<Integer>, StreamWindow<Integer>> windowReducer = new WindowReducer<Integer>(
new ReduceFunction<Integer>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
index e739111..0bb1848 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/StreamVertexTest.java
@@ -37,7 +37,6 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
import org.junit.Test;
public class StreamVertexTest {
@@ -45,22 +44,22 @@ public class StreamVertexTest {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
public static class MySource implements SourceFunction<Tuple1<Integer>> {
- private static final long serialVersionUID = 1L;
-
private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
+ private int i = 0;
+
@Override
- public void run(Collector<Tuple1<Integer>> collector) throws Exception {
- for (int i = 0; i < 10; i++) {
- tuple.f0 = i;
- collector.collect(tuple);
- }
+ public boolean reachedEnd() throws Exception {
+ return i >= 10;
}
@Override
- public void cancel() {
- // No cleanup needed
+ public Tuple1<Integer> next() throws Exception {
+ tuple.f0 = i;
+ i++;
+ return tuple;
}
+
}
public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index 697c796..7713994 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -20,31 +20,32 @@ package org.apache.flink.streaming.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.co.CoStreamOperator;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.CoReaderIterator;
-import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
+public class MockCoContext<IN1, IN2, OUT> {
// private Collection<IN1> input1;
// private Collection<IN2> input2;
private Iterator<IN1> inputIterator1;
private Iterator<IN2> inputIterator2;
private List<OUT> outputs;
- private Collector<OUT> collector;
+ private Output<OUT> collector;
private StreamRecordSerializer<IN1> inDeserializer1;
private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator;
private StreamRecordSerializer<IN2> inDeserializer2;
@@ -66,7 +67,7 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2);
outputs = new ArrayList<OUT>();
- collector = new MockCollector<OUT>(outputs);
+ collector = new MockOutput<OUT>(outputs);
}
private int currentInput = 1;
@@ -137,7 +138,7 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
return outputs;
}
- public Collector<OUT> getCollector() {
+ public Output<OUT> getCollector() {
return collector;
}
@@ -153,14 +154,57 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
return mockIterator;
}
- public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoStreamOperator<IN1, IN2, OUT> operator,
+ public static <IN1, IN2, OUT> List<OUT> createAndExecute(TwoInputStreamOperator<IN1, IN2, OUT> operator,
List<IN1> input1, List<IN2> input2) {
MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
- operator.setup(mockContext);
+ RuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
+ new ExecutionConfig());
+
+ operator.setup(mockContext.collector, runtimeContext);
try {
operator.open(null);
- operator.run();
+
+ StreamRecordSerializer<IN1> inputDeserializer1 = mockContext.getInDeserializer1();
+ StreamRecordSerializer<IN2> inputDeserializer2 = mockContext.getInDeserializer2();
+ CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter = mockContext.mockIterator;
+
+ boolean isRunning = true;
+
+ int next;
+ StreamRecord<IN1> reuse1 = inputDeserializer1.createInstance();
+ StreamRecord<IN2> reuse2 = inputDeserializer2.createInstance();
+
+ while (isRunning) {
+ try {
+ next = coIter.next(reuse1, reuse2);
+ } catch (IOException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record.", e);
+ } else {
+ // Task already cancelled do nothing
+ next = 0;
+ }
+ } catch (IllegalStateException e) {
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record.", e);
+ } else {
+ // Task already cancelled do nothing
+ next = 0;
+ }
+ }
+
+ if (next == 0) {
+ break;
+ } else if (next == 1) {
+ operator.processElement1(reuse1.getObject());
+ reuse1 = inputDeserializer1.createInstance();
+ } else {
+ operator.processElement2(reuse2.getObject());
+ reuse2 = inputDeserializer2.createInstance();
+ }
+ }
+
operator.close();
} catch (Exception e) {
throw new RuntimeException("Cannot invoke operator.", e);
@@ -168,63 +212,4 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
return mockContext.getOutputs();
}
-
- @Override
- public StreamConfig getConfig() {
- return null;
- }
-
- @Override
- public ClassLoader getUserCodeClassLoader() {
- return null;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> MutableObjectIterator<X> getInput(int index) {
- switch (index) {
- case 0:
- return (MutableObjectIterator<X>) inputIterator1;
- case 1:
- return (MutableObjectIterator<X>) inputIterator2;
- default:
- throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
- switch (index) {
- case 0:
- return (StreamRecordSerializer<X>) inDeserializer1;
- case 1:
- return (StreamRecordSerializer<X>) inDeserializer2;
- default:
- throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X, Y> CoReaderIterator<X, Y> getCoReader() {
- return (CoReaderIterator<X, Y>) mockIterator;
- }
-
- @Override
- public Collector<OUT> getOutputCollector() {
- return collector;
- }
-
- @Override
- public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
- throw new UnsupportedOperationException(
- "Indexed iterator is currently unsupported for connected streams.");
- }
-
- @Override
- public ExecutionConfig getExecutionConfig() {
- return new ExecutionConfig();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
deleted file mode 100644
index e8b96c5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.util.Collector;
-
-public class MockCollector<T> implements Collector<T> {
- private Collection<T> outputs;
-
- public MockCollector(Collection<T> outputs) {
- this.outputs = outputs;
- }
-
- @Override
- public void collect(T record) {
- T copied = SerializationUtils.deserialize(SerializationUtils
- .serialize((Serializable) record));
- outputs.add(copied);
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 709a59a..8b5607f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -20,27 +20,29 @@ package org.apache.flink.streaming.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
+public class MockContext<IN, OUT> {
private Collection<IN> inputs;
private List<OUT> outputs;
- private Collector<OUT> collector;
+ private MockOutput<OUT> output;
private StreamRecordSerializer<IN> inDeserializer;
private IndexedReaderIterator<StreamRecord<IN>> iterator;
@@ -55,7 +57,7 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
iterator = new IndexedInputIterator();
outputs = new ArrayList<OUT>();
- collector = new MockCollector<OUT>(outputs);
+ output = new MockOutput<OUT>(outputs);
}
private class IndexedInputIterator extends IndexedReaderIterator<StreamRecord<IN>> {
@@ -92,25 +94,29 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
return outputs;
}
- public Collector<OUT> getCollector() {
- return collector;
- }
-
- public StreamRecordSerializer<IN> getInDeserializer() {
- return inDeserializer;
+ public Collector<OUT> getOutput() {
+ return output;
}
public MutableObjectIterator<StreamRecord<IN>> getIterator() {
return iterator;
}
- public static <IN, OUT> List<OUT> createAndExecute(StreamOperator<IN, OUT> operator,
+ public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator,
List<IN> inputs) {
MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
- operator.setup(mockContext);
+ RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
+ new ExecutionConfig());
+
+ operator.setup(mockContext.output, runtimeContext);
try {
operator.open(null);
- operator.run();
+
+ StreamRecord<IN> nextRecord;
+ while ((nextRecord = mockContext.getIterator().next()) != null) {
+ operator.processElement(nextRecord.getObject());
+ }
+
operator.close();
} catch (Exception e) {
throw new RuntimeException("Cannot invoke operator.", e);
@@ -118,56 +124,4 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
return mockContext.getOutputs();
}
-
- @Override
- public StreamConfig getConfig() {
- return null;
- }
-
- @Override
- public ClassLoader getUserCodeClassLoader() {
- return null;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> MutableObjectIterator<X> getInput(int index) {
- if (index == 0) {
- return (MutableObjectIterator<X>) iterator;
- } else {
- throw new IllegalArgumentException("There is only 1 input");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
- if (index == 0) {
- return (StreamRecordSerializer<X>) inDeserializer;
- } else {
- throw new IllegalArgumentException("There is only 1 input");
- }
- }
-
- @Override
- public Collector<OUT> getOutputCollector() {
- return collector;
- }
-
- @Override
- public <X, Y> CoReaderIterator<X, Y> getCoReader() {
- throw new IllegalArgumentException("CoReader not available");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
- return (IndexedReaderIterator<X>) iterator;
- }
-
- @Override
- public ExecutionConfig getExecutionConfig() {
- return new ExecutionConfig();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
new file mode 100644
index 0000000..6799d87
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.util.Collector;
+
+public class MockOutput<T> implements Output<T> {
+ private Collection<T> outputs;
+
+ public MockOutput(Collection<T> outputs) {
+ this.outputs = outputs;
+ }
+
+ @Override
+ public void collect(T record) {
+ T copied = SerializationUtils.deserialize(SerializationUtils
+ .serialize((Serializable) record));
+ outputs.add(copied);
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
index 2f6e450..95cb65c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -20,14 +20,23 @@ package org.apache.flink.streaming.util;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.Collector;
public class MockSource<T> {
- public static <T> List<T> createAndExecute(SourceFunction<T> source) {
+ public static <T> List<T> createAndExecute(SourceFunction<T> sourceFunction) throws Exception {
List<T> outputs = new ArrayList<T>();
+ if (sourceFunction instanceof RichSourceFunction) {
+ ((RichSourceFunction<T>) sourceFunction).open(new Configuration());
+ }
try {
- source.run(new MockCollector<T>(outputs));
+ Collector<T> collector = new MockOutput<T>(outputs);
+ while (!sourceFunction.reachedEnd()) {
+ collector.collect(sourceFunction.next());
+ }
} catch (Exception e) {
throw new RuntimeException("Cannot invoke source.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 86c739c..7d985ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
@@ -112,20 +111,19 @@ public class IterateExample {
private Random rnd = new Random();
@Override
- public void run(Collector<Tuple2<Integer, Integer>> collector) throws Exception {
- while (true) {
- int first = rnd.nextInt(BOUND / 2 - 1) + 1;
- int second = rnd.nextInt(BOUND / 2 - 1) + 1;
-
- collector.collect(new Tuple2<Integer, Integer>(first, second));
- Thread.sleep(500L);
- }
+ public boolean reachedEnd() throws Exception {
+ return false;
}
@Override
- public void cancel() {
- // no cleanup needed
+ public Tuple2<Integer, Integer> next() throws Exception {
+ int first = rnd.nextInt(BOUND / 2 - 1) + 1;
+ int second = rnd.nextInt(BOUND / 2 - 1) + 1;
+
+ Thread.sleep(500L);
+ return new Tuple2<Integer, Integer>(first, second);
}
+
}
/**
@@ -240,4 +238,4 @@ public class IterateExample {
return true;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index cf9edfe..68df7b0 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.util.Collector;
import java.util.Random;
@@ -111,19 +110,18 @@ public class WindowJoin {
}
@Override
- public void run(Collector<Tuple2<String, Integer>> out) throws Exception {
- while (true) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
- out.collect(outTuple);
- Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
- }
+ public boolean reachedEnd() throws Exception {
+ return false;
}
-
+
@Override
- public void cancel() {
- // No cleanup needed
+ public Tuple2<String, Integer> next() throws Exception {
+ outTuple.f0 = names[rand.nextInt(names.length)];
+ outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+ Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+ return outTuple;
}
+
}
/**
@@ -142,19 +140,18 @@ public class WindowJoin {
}
@Override
- public void run(Collector<Tuple2<String, Integer>> out) throws Exception {
- while (true) {
- outTuple.f0 = names[rand.nextInt(names.length)];
- outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
- out.collect(outTuple);
- Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
- }
+ public boolean reachedEnd() throws Exception {
+ return false;
}
-
+
@Override
- public void cancel() {
- // No cleanup needed
+ public Tuple2<String, Integer> next() throws Exception {
+ outTuple.f0 = names[rand.nextInt(names.length)];
+ outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+ Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+ return outTuple;
}
+
}
public static class MySourceMap extends RichMapFunction<String, Tuple2<String, Integer>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index bfc9e1a..9fb7cae 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -95,21 +95,16 @@ public class IncrementalLearningSkeleton {
private static final int NEW_DATA_SLEEP_TIME = 1000;
@Override
- public void run(Collector<Integer> collector) throws Exception {
- while (true) {
- collector.collect(getNewData());
- }
+ public boolean reachedEnd() throws Exception {
+ return false;
}
- private Integer getNewData() throws InterruptedException {
+ @Override
+ public Integer next() throws Exception {
Thread.sleep(NEW_DATA_SLEEP_TIME);
return 1;
}
-
- @Override
- public void cancel() {
- // No cleanup needed
- }
+
}
/**
@@ -120,24 +115,22 @@ public class IncrementalLearningSkeleton {
private static final long serialVersionUID = 1L;
private int counter;
- @Override
- public void run(Collector<Integer> collector) throws Exception {
- Thread.sleep(15);
- while (counter < 50) {
- collector.collect(getNewData());
- }
+ private Integer getNewData() throws InterruptedException {
+ Thread.sleep(5);
+ counter++;
+ return 1;
}
@Override
- public void cancel() {
- // No cleanup needed
+ public boolean reachedEnd() throws Exception {
+ return counter >= 50;
}
- private Integer getNewData() throws InterruptedException {
- Thread.sleep(5);
- counter++;
- return 1;
+ @Override
+ public Integer next() throws Exception {
+ return getNewData();
}
+
}
/**
@@ -149,23 +142,16 @@ public class IncrementalLearningSkeleton {
private static final int TRAINING_DATA_SLEEP_TIME = 10;
@Override
- public void run(Collector<Integer> collector) throws Exception {
- while (true) {
- collector.collect(getTrainingData());
- }
-
+ public boolean reachedEnd() throws Exception {
+ return false;
}
- private Integer getTrainingData() throws InterruptedException {
+ @Override
+ public Integer next() throws Exception {
Thread.sleep(TRAINING_DATA_SLEEP_TIME);
return 1;
-
- }
-
- @Override
- public void cancel() {
- // No cleanup needed
}
+
}
/**
@@ -176,22 +162,21 @@ public class IncrementalLearningSkeleton {
private static final long serialVersionUID = 1L;
private int counter = 0;
- @Override
- public void run(Collector<Integer> collector) throws Exception {
- while (counter < 8200) {
- collector.collect(getTrainingData());
- }
+ private Integer getTrainingData() throws InterruptedException {
+ counter++;
+ return 1;
}
@Override
- public void cancel() {
- // No cleanup needed
+ public boolean reachedEnd() throws Exception {
+ return counter >= 8200;
}
- private Integer getTrainingData() throws InterruptedException {
- counter++;
- return 1;
+ @Override
+ public Integer next() throws Exception {
+ return getTrainingData();
}
+
}
public static class LinearTimestamp implements Timestamp<Integer> {
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 1522910..f0ebccc 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -1,19 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.flink.streaming.examples.windowing;
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
@@ -55,25 +54,24 @@ public class SessionWindowing {
DataStream<Tuple3<String, Long, Integer>> source = env
.addSource(new SourceFunction<Tuple3<String, Long, Integer>>() {
+ int index = 0;
@Override
- public void run(Collector<Tuple3<String, Long, Integer>> collector)
- throws Exception {
- for (Tuple3<String, Long, Integer> value : input) {
- // We sleep three seconds between every output so we
- // can see whether we properly detect sessions
- // before the next start for a specific id
- collector.collect(value);
- if (!fileOutput) {
- System.out.println("Collected: " + value);
- Thread.sleep(3000);
- }
- }
+ public boolean reachedEnd() throws Exception {
+ return index >= input.size();
}
@Override
- public void cancel() {
+ public Tuple3<String, Long, Integer> next() throws Exception {
+ Tuple3<String, Long, Integer> result = input.get(index);
+ index++;
+ if (!fileOutput) {
+ System.out.println("Collected: " + result);
+ Thread.sleep(3000);
+ }
+ return result;
}
+
});
// We create sessions for each id with max timeout of 3 time units
http://git-wip-us.apache.org/repos/asf/flink/blob/58865ff3/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index d745fc5..0974f3d 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -28,11 +28,12 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
@@ -254,34 +255,42 @@ public class StockPrices {
// USER FUNCTIONS
// *************************************************************************
- public final static class StockSource implements SourceFunction<StockPrice> {
+ public final static class StockSource extends RichSourceFunction<StockPrice> {
private static final long serialVersionUID = 1L;
private Double price;
private String symbol;
private Integer sigma;
+ private transient Random random;
+
+
public StockSource(String symbol, Integer sigma) {
this.symbol = symbol;
this.sigma = sigma;
+ price = DEFAULT_PRICE;
+
}
@Override
- public void run(Collector<StockPrice> collector) throws Exception {
- price = DEFAULT_PRICE;
- Random random = new Random();
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ random = new Random();
- while (true) {
- price = price + random.nextGaussian() * sigma;
- collector.collect(new StockPrice(symbol, price));
- Thread.sleep(random.nextInt(200));
- }
}
-
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
@Override
- public void cancel() {
- // No cleanup needed
+ public StockPrice next() throws Exception {
+ price = price + random.nextGaussian() * sigma;
+ Thread.sleep(random.nextInt(200));
+ return new StockPrice(symbol, price);
}
+
}
public final static class WindowMean implements WindowMapFunction<StockPrice, StockPrice> {
@@ -305,33 +314,35 @@ public class StockPrices {
}
}
- public static final class TweetSource implements SourceFunction<String> {
+ public static final class TweetSource extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
- Random random;
- StringBuilder stringBuilder;
+ private transient Random random;
+ private transient StringBuilder stringBuilder;
@Override
- public void run(Collector<String> collector) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
random = new Random();
stringBuilder = new StringBuilder();
+ }
- while (true) {
- stringBuilder.setLength(0);
- for (int i = 0; i < 3; i++) {
- stringBuilder.append(" ");
- stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size())));
- }
- collector.collect(stringBuilder.toString());
- Thread.sleep(500);
- }
-
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
}
-
+
@Override
- public void cancel() {
- // No cleanup needed
+ public String next() throws Exception {
+ stringBuilder.setLength(0);
+ for (int i = 0; i < 3; i++) {
+ stringBuilder.append(" ");
+ stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size())));
+ }
+ Thread.sleep(500);
+ return stringBuilder.toString();
}
+
}
public static final class SendWarning implements WindowMapFunction<StockPrice, String> {