You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:55 UTC

[12/18] git commit: [streaming] Minor bug fixes in Connectors, StreamCollector & docs

[streaming] Minor bug fixes in Connectors, StreamCollector & docs

This closes #115


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/439ca7ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/439ca7ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/439ca7ff

Branch: refs/heads/master
Commit: 439ca7ffe0a7a9fe856b22bfed50c031f062c7fb
Parents: 6b6951e
Author: Márton Balassi <ba...@gmail.com>
Authored: Sat Sep 20 13:14:57 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 81 ++++++++++----------
 .../connectors/twitter/TwitterSource.java       | 11 ++-
 .../api/collector/DirectedStreamCollector.java  | 29 ++++---
 .../api/collector/DirectedOutputTest.java       | 14 +++-
 4 files changed, 75 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/439ca7ff/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index c1c6cde..87d851d 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -78,7 +78,7 @@ public class StreamingWordCount {
 Program Skeleton
 ----------------
 
-As we could already see in the example, a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
+As presented in the [example](#example), a Flink Streaming program looks almost identical to a regular Flink program. Each stream processing program consists of the following parts:
 
 1. Creating a `StreamExecutionEnvironment`,
 2. Connecting to data stream sources,
@@ -100,7 +100,7 @@ For connecting to data streams the `StreamExecutionEnvironment` has many differe
 env.readTextFile(filePath)
 ```
 
-After defining the data stream sources, the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which we will see in the [operations](#operations) section.
+After defining the data stream sources, the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [operations](#operations) section.
 
 ```java
 dataStream.map(new Mapper()).reduce(new Reducer())
@@ -126,30 +126,30 @@ Basics
 
 ### DataStream
 
-The `DataStream` is the basic abstraction provided by the the Flink Streaming API. It represents a continuous stream of data of a certain type from either a data source or a transformed data stream. Operations on the DataStreams will be applied on individual data points or windows of the `DataStream` based on the type of the operation. For example the map operator transforms each data point individually while window or batch aggregations work on an interval of data points at the same time.
+The `DataStream` is the basic abstraction provided by the Flink Streaming API. It represents a continuous stream of data of a certain type from either a data source or a transformed data stream. Operations will be applied on individual data points or windows of the `DataStream` based on the type of the operation. For example the map operator transforms each data point individually while window or batch aggregations work on an interval of data points at the same time.
  
-The different operations return different `DataStream` types allowing more elaborate transformations, for example the `groupBy()` method returns a `GroupedDataStream` which can be used for group operations.
+The operations may return different `DataStream` types allowing more elaborate transformations, for example the `groupBy()` method returns a `GroupedDataStream` which can be used for group operations.
 
 ### Partitioning
 
-Partitioning controls how individual data points are distributed among the parallel instances of the transformation operators. By default Forward partitioning is used. There are several partitioning types supported in Flink Streaming:
+Partitioning controls how individual data points are distributed among the parallel instances of the transformation operators. By default *Forward* partitioning is used. There are several partitioning types supported in Flink Streaming:
 
- * Forward: Forward partitioning directs the output data to the next operator on the same core (if possible) avoiding expensive network I/O. This is the default partitioner.
+ * *Forward*: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. This is the default partitioner.
 Usage: `dataStream.forward()`
- * Shuffle: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution.
+ * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution.
 Usage: `dataStream.shuffle()`
- * Distribute: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
+ * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
- * Field: Field partitioning partitions the output data stream based on the hash code of a selected key field. Data points with the same key will always go to the same operator instance.
+ * *Field*: Field partitioning partitions the output data stream based on the hash code of a selected key field. Data points with the same key are directed to the same operator instance.
 Usage: `dataStream.partitionBy(keyposition)`
- * Broadcast: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
+ * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
 Usage: `dataStream.broadcast()`
- * Global: All data points end up at the same operator instance. To achieve a clearer structure use the parallelism setting of the corresponding operator for this.
+ * *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator.
 Usage: `operator.setParallelism(1)`
 
 ### Sources
 
-The user can connect to different data streams by the different implemenations of `DataStreamSource` using methods provided in `StreamExecutionEnvironment`. There are several predefined ones similar to the ones provided by the batch API like:
+The user can connect to data streams by the different implemenations of `DataStreamSource` using methods provided in `StreamExecutionEnvironment`. There are several predefined ones similar to the ones provided by the batch API like:
 
  * `env.genereateSequence(from, to)`
  * `env.fromElements(elements…)`
@@ -251,19 +251,19 @@ When the reduce operator is applied on a grouped data stream, the user-defined `
 
 ### Aggregations
 
-The Flink streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion.
+The Flink Streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion.
 
 Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`
 
-For every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, 0 is used as default. 
+For every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, position `0` is used as default. 
 
 ### Window/Batch operators
 
 Window and batch operators allow the user to execute function on slices or windows of the DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize is used as stepsize by default.
 
-When applied to grouped data streams the data stream will be batched/windowed for different key values separately. 
+When applied to grouped data streams the data stream is batched/windowed for different key values separately. 
 
-For example a `ds.groupBy(0).batch(100, 10)` will produce batches of the last 100 elements for each key value with 10 record step size.
+For example a `dataStream.groupBy(0).batch(100, 10)` produces batches of the last 100 elements for each key value with 10 record step size.
  
 #### Reduce on windowed/batched data streams
 The transformation calls a user-defined `ReduceFunction` on records received in the batch or during the predefined time window. The window is shifted after each reduce call. The user can also use the different streaming aggregations.
@@ -291,10 +291,10 @@ Applies a CoMap transformation on two separate DataStreams, mapping them to a co
 A CoMap operator that outputs true if an Integer value is received and false if a String value is received:
 
 ```java
-DataStream<Integer> ds1 = ...
-DataStream<String> ds2 = ...
+DataStream<Integer> dataStream1 = ...
+DataStream<String> dataStream2 = ...
 		
-ds1.connect(ds2).
+dataStream1.connect(dataStream2)
 	.map(new CoMapFunction<Integer, String, Boolean>() {
 			
 			@Override
@@ -310,13 +310,13 @@ ds1.connect(ds2).
 ```
 
 #### FlatMap on ConnectedDataStream
-The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead of returning exactly one element after each map call the user can output zero or more values using the Collector interface. 
+The FlatMap operator for the `ConnectedDataStream` works similarly to CoMap, but instead of returning exactly one element after each map call the user can output arbitrarily many values using the Collector interface. 
 
 ```java
-DataStream<Integer> ds1 = ...
-DataStream<String> ds2 = ...
-
-ds1.connect(ds2)
+DataStream<Integer> dataStream1 = ...
+DataStream<String> dataStream2 = ...
+		
+dataStream1.connect(dataStream2)
 	.flatMap(new CoFlatMapFunction<Integer, String, Boolean>() {
 
 			@Override
@@ -337,7 +337,7 @@ The Reduce operator for the `ConnectedDataStream` applies a simple reduce transf
 <section id="output-splitting">
 ### Output splitting
 
-Most data stream operators support directed outputs. It means that different data elements are received by only given outputs. The outputs are referenced by their name given at the point of receiving:
+Most data stream operators support directed outputs, meaning that different data elements are received by only given outputs. The outputs are referenced by their name given at the point of receiving:
 
 ```java
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
@@ -345,7 +345,7 @@ DataStream<Integer> even = split.select("even");
 DataStream<Integer> odd = split.select("odd");
 ```
 
-Data streams will only receive the elements directed to selected output names. These outputs are directed by implementing a selector function (extending `OutputSelector`):
+Data streams only receive the elements directed to selected output names. These outputs are directed by implementing a selector function (extending `OutputSelector`):
 
 ```java
 void select(OUT value, Collection<String> outputs);
@@ -365,7 +365,7 @@ void select(Integer value, Collection<String> outputs) {
 ```
 
 This output selection allows data streams to listen to multiple outputs, and data points to be sent to multiple outputs. A value is sent to all the outputs specified in the `OutputSelector` and a data stream will receive a value if it has selected any of the outputs the value is sent to. The stream will receive the data at most once.
-It is common that a stream needs to listen to all the outputs, so `split.selectAll()` is provided as an alias for explicitly selecting all output names.
+It is common that a stream listens to all the outputs, so `split.selectAll()` is provided as an alias for explicitly selecting all output names.
 
 
 ### Iterations
@@ -376,13 +376,13 @@ To start an iterative part of the program the user defines the iteration startin
 ```java
 IterativeDataStream<Integer> iteration = source.iterate();
 ```
-The operator applied on the iteration starting point will be the head of the iteration, where data is fed back from the iteration tail.
+The operator applied on the iteration starting point is the head of the iteration, where data is fed back from the iteration tail.
 
 ```java
 DataStream<Integer> head = iteration.map(new IterationHead());
 ```
 
-To close an iteration and define the iteration tail, the user needs to call `.closeWith(tail)` method of the `IterativeDataStream`:
+To close an iteration and define the iteration tail, the user calls `.closeWith(tail)` method of the `IterativeDataStream`:
 
 ```java
 DataStream<Integer> tail = head.map(new IterationTail());
@@ -394,19 +394,19 @@ SplitDataStream<Integer> tail = head.map(new IterationTail()).split(outputSelect
 iteration.closeWith(tail.select("iterate"));
 ``` 
 
-Because iterative streaming programs do not have a set number of iteratons for each data element, the streaming program no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances we provide a method to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
-To use this function the user need to call, the `iteration.setMaxWaitTime(millis)` to control the max wait time. 
+Because iterative streaming programs do not have a set number of iteratons for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
+To use this function the user needs to call, the `iteration.setMaxWaitTime(millis)` to control the max wait time. 
 
 ### Rich functions
-The usage of rich functions are essentially the same as in the core Flink API. All transformations that take as argument a user-defined function can instead take as argument a rich function:
+The usage of rich functions are essentially the same as in the core Flink API. All transformations that take as argument a user-defined function can instead take a rich function as argument:
 
 ```java
-ds.map(new RichMapFunction<String, Integer>() {
+dataStream.map(new RichMapFunction<String, Integer>() {
   public Integer map(String value) { return value.toString(); }
 });
 ```
 
-Rich functions provide, in addition to the user-defined function (`map`, `reduce`, etc), the `open` and `close` methods for initialization and finalization. (In contrast to the core API, the streaming API currently does not support the  `getRuntimeContext` and `setRuntimeContext` methods.)
+Rich functions provide, in addition to the user-defined function (`map()`, `reduce()`, etc), the `open()` and `close()` methods for initialization and finalization. (In contrast to the core API, the streaming API currently does not support the  `getRuntimeContext()` and `setRuntimeContext()` methods.)
 
 [Back to top](#top)
 
@@ -416,7 +416,7 @@ Operator Settings
 
 ### Parallelism
 
-Setting parallelism for operators works exactly the same way as in the core Flink API The user can control the number of parallel instances created for each operator by calling the `operator.setParallelism(dop)` method.
+Setting parallelism for operators works exactly the same way as in the core Flink API. The user can control the number of parallel instances created for each operator by calling the `operator.setParallelism(dop)` method.
 
 ### Buffer timeout
 
@@ -432,7 +432,7 @@ env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
 
 ### Mutability
 
-Most operators allows setting mutability for reading input data. If the operator is set mutable then the variable used to store input data for operators will be reused in a mutable fashion to avoid excessive object creation. By default, all operators are set to immutable.
+Most operators allow setting mutability for reading input data. If the operator is set mutable then the variable used to store input data for operators will be reused in a mutable fashion to avoid excessive object creation. By default, all operators are set to immutable.
 Usage:
 ```java
 operator.setMutability(isMutable)
@@ -859,7 +859,8 @@ java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.ra
 ```
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project is in a directory called git then the jar can be found here: (the version number may change later)
-```/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ```
+```batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ```
 
 In the example there are to connectors. One that sends messages to RabbitMQ and one that receives messages from the same queue. In the logger messages the arriving messages can be observed in the following format:
 
@@ -905,7 +906,8 @@ java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.ka
 ```
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project is in a directory called git then the jar can be found here: (the version number may change later)
-```/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ```
+```batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ```
 
 In the example there are to connectors. One that sends messages to Kafka and one that receives messages from the same queue. In the logger messages the arriving messages can be observed in the following format:
 
@@ -966,7 +968,8 @@ java -cp /PATH/TO/JAR-WITH-DEPENDENCIES org.apache.flink.streaming.connectors.fl
 ```
 
 The maven assemby plugin creates one jar with all the requiered dependencies. If the project is in a directory called git then the jar can be found here: (the version number may change later)
-```/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ```
+```batch
+/git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/target/flink-streaming-connectors-0.7-incubating-SNAPSHOT-jar-with-dependencies.jar ```
 In the example there are to connectors. One that sends messages to Flume and one that receives messages from the same queue. In the logger messages the arriving messages can be observed in the following format:
 
 ```

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/439ca7ff/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 525f4c8..0ae3723 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.connectors.twitter;
 
 import java.io.FileInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
@@ -27,10 +26,10 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
@@ -149,8 +148,8 @@ public class TwitterSource extends RichSourceFunction<String> {
 			InputStream input = new FileInputStream(authPath);
 			properties.load(input);
 			input.close();
-		} catch (IOException ioe) {
-			new RuntimeException("Cannot open .properties file: " + authPath, ioe);
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot open .properties file: " + authPath, e);
 		}
 		return properties;
 	}
@@ -226,7 +225,7 @@ public class TwitterSource extends RichSourceFunction<String> {
 				}
 			}
 		} catch (InterruptedException e) {
-			new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+			throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/439ca7ff/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index 42a2683..40fe3c6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -86,13 +86,6 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 		for (String outputName : outputNames) {
 			List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList = outputMap
 					.get(outputName);
-			if (outputList == null) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Cannot emit because no output is selected with the name: {}",
-							outputName);
-				}
-			}
-
 			try {
 				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) {
 					if (!emitted.contains(output)) {
@@ -101,18 +94,32 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 					}
 				}
 
-				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputList) {
-					if (!emitted.contains(output)) {
-						output.emit(serializationDelegate);
-						emitted.add(output);
+				if (outputList == null) {
+					if (LOG.isErrorEnabled()) {
+						String format = String.format(
+								"Cannot emit because no output is selected with the name: %s",
+								outputName);
+						LOG.error(format);
+
+					}
+				} else {
+
+					for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputList) {
+						if (!emitted.contains(output)) {
+							output.emit(serializationDelegate);
+							emitted.add(output);
+						}
 					}
+
 				}
+
 			} catch (Exception e) {
 				if (LOG.isErrorEnabled()) {
 					LOG.error("Emit to {} failed due to: {}", outputName,
 							StringUtils.stringifyException(e));
 				}
 			}
+
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/439ca7ff/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index e9d3994..85f3d7f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -43,6 +43,7 @@ public class DirectedOutputTest {
 	private static final String EVEN_AND_ODD = "evenAndOdd";
 	private static final String ODD_AND_TEN = "oddAndTen";
 	private static final String EVEN = "even";
+	private static final String NON_SELECTED = "nonSelected";
 
 	static final class MyMap implements MapFunction<Long, Long> {
 		private static final long serialVersionUID = 1L;
@@ -67,6 +68,10 @@ public class DirectedOutputTest {
 			if (value == 10L) {
 				outputs.add(TEN);
 			}
+			
+			if (value == 11L) {
+				outputs.add(NON_SELECTED);
+			}
 		}
 	}
 	
@@ -97,10 +102,11 @@ public class DirectedOutputTest {
 	
 	@Test
 	public void outputSelectorTest() throws Exception {
+		
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		
-		SplitDataStream<Long> source = env.generateSequence(1, 10).split(new MyOutputSelector());
+		SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
 		source.select(EVEN).addSink(new ListSink(EVEN));
 		source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
 		source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
@@ -108,8 +114,8 @@ public class DirectedOutputTest {
 		
 		env.executeTest(128);
 		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
-		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L), outputs.get(ODD_AND_TEN));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), outputs.get(EVEN_AND_ODD));
-		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), outputs.get(ALL));
+		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(EVEN_AND_ODD));
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
 	}
 }