You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/08/19 18:42:12 UTC
[3/6] flink git commit: [FLINK-2398][api-breaking] Introduce
StreamGraphGenerator
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
new file mode 100644
index 0000000..d392fd5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/SplitTransformation.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This transformation represents a split of one
+ * {@link org.apache.flink.streaming.api.datastream.DataStream} into several {@code DataStreams}
+ * using an {@link org.apache.flink.streaming.api.collector.selector.OutputSelector}.
+ *
+ * <p>
+ * This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code SplitTransformation}
+ */
+public class SplitTransformation<T> extends StreamTransformation<T> {
+
+ private final StreamTransformation<T> input;
+
+ private final OutputSelector<T> outputSelector;
+
+ /**
+ * Creates a new {@code SplitTransformation} from the given input and {@code OutputSelector}.
+ *
+ * @param input The input {@code StreamTransformation}
+ * @param outputSelector The output selector
+ */
+ public SplitTransformation(StreamTransformation<T> input,
+ OutputSelector<T> outputSelector) {
+ super("Split", input.getOutputType(), input.getParallelism());
+ this.input = input;
+ this.outputSelector = outputSelector;
+ }
+
+ /**
+ * Returns the input {@code StreamTransformation}.
+ */
+ public StreamTransformation<T> getInput() {
+ return input;
+ }
+
+ /**
+ * Returns the {@code OutputSelector}
+ */
+ public OutputSelector<T> getOutputSelector() {
+ return outputSelector;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input.getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
new file mode 100644
index 0000000..dadcfa2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+
+/**
+ * A {@code StreamTransformation} represents the operation that creates a
+ * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
+ * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying
+ * {@code StreamTransformation} that is the origin of said DataStream.
+ *
+ * <p>
+ * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create
+ * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this
+ * graph is translated to a {@link StreamGraph} using
+ * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}.
+ *
+ * <p>
+ * A {@code StreamTransformation} does not necessarily correspond to a physical operation
+ * at runtime. Some operations are only logical concepts. Examples of this are union,
+ * split/select data stream, partitioning.
+ *
+ * <p>
+ * The following graph of {@code StreamTransformations}:
+ *
+ * <pre>
+ * Source Source
+ * + +
+ * | |
+ * v v
+ * Rebalance HashPartition
+ * + +
+ * | |
+ * | |
+ * +------>Union<------+
+ * +
+ * |
+ * v
+ * Split
+ * +
+ * |
+ * v
+ * Select
+ * +
+ * v
+ * Map
+ * +
+ * |
+ * v
+ * Sink
+ * </pre>
+ *
+ * Would result in this graph of operations at runtime:
+ *
+ * <pre>
+ * Source Source
+ * + +
+ * | |
+ * | |
+ * +------->Map<-------+
+ * +
+ * |
+ * v
+ * Sink
+ * </pre>
+ *
+ * The information about partitioning, union, split/select end up being encoded in the edges
+ * that connect the sources to the map operation.
+ *
+ * @param <T> The type of the elements that result from this {@code StreamTransformation}
+ */
+public abstract class StreamTransformation<T> {
+
+ // This is used to assign a unique ID to every StreamTransformation
+ protected static Integer idCounter = 0;
+ public static int getNewNodeId() {
+ idCounter++;
+ return idCounter;
+ }
+
+ protected final int id;
+
+ protected String name;
+
+ protected TypeInformation<T> outputType;
+ // This is used to handle MissingTypeInfo. As long as the outputType has not been queried
+ // it can still be changed using setOutputType(). Afterwards an exception is thrown when
+ // trying to change the output type.
+ protected boolean typeUsed;
+
+ private int parallelism;
+
+ protected long bufferTimeout = -1;
+
+ protected StreamGraph.ResourceStrategy resourceStrategy = StreamGraph.ResourceStrategy.DEFAULT;
+
+ /**
+ * Creates a new {@code StreamTransformation} with the given name, output type and parallelism.
+ *
+ * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
+ * @param outputType The output type of this {@code StreamTransformation}
+ * @param parallelism The parallelism of this {@code StreamTransformation}
+ */
+ public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {
+ this.id = getNewNodeId();
+ this.name = Preconditions.checkNotNull(name);
+ this.outputType = outputType;
+ this.parallelism = parallelism;
+ }
+
+ /**
+ * Returns the unique ID of this {@code StreamTransformation}.
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * Changes the name of this {@code StreamTransformation}.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Returns the name of this {@code StreamTransformation}.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the parallelism of this {@code StreamTransformation}
+ */
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ /**
+ * Sets the parallelism of this {@code StreamTransformation}
+ * @param parallelism The new parallelism to set on this {@code StreamTransformation}
+ */
+ public void setParallelism(int parallelism) {
+ Preconditions.checkArgument(parallelism > 0, "Parallelism must be bigger than zero.");
+ this.parallelism = parallelism;
+ }
+
+ /**
+ * Tries to fill in the type information. Type information can be filled in
+ * later when the program uses a type hint. This method checks whether the
+ * type information has ever been accessed before and does not allow
+ * modifications if the type was accessed already. This ensures consistency
+ * by making sure different parts of the operation do not assume different
+ * type information.
+ *
+ * @param outputType The type information to fill in.
+ *
+ * @throws IllegalStateException Thrown, if the type information has been accessed before.
+ */
+ public void setOutputType(TypeInformation<T> outputType) {
+ if (typeUsed) {
+ throw new IllegalStateException(
+ "TypeInformation cannot be filled in for the type after it has been used. "
+ + "Please make sure that the type info hints are the first call after"
+ + " the transformation function, "
+ + "before any access to types or semantic properties, etc.");
+ }
+ this.outputType = outputType;
+ }
+
+ /**
+ * Returns the output type of this {@code StreamTransformation} as a {@link TypeInformation}. Once
+ * this is used once the output type cannot be changed anymore using {@link #setOutputType}.
+ *
+ * @return The output type of this {@code StreamTransformation}
+ */
+ public TypeInformation<T> getOutputType() {
+ if (outputType instanceof MissingTypeInfo) {
+ MissingTypeInfo typeInfo = (MissingTypeInfo) this.outputType;
+ throw new InvalidTypesException(
+ "The return type of function '"
+ + typeInfo.getFunctionName()
+ + "' could not be determined automatically, due to type erasure. "
+ + "You can give type information hints by using the returns(...) "
+ + "method on the result of the transformation call, or by letting "
+ + "your function implement the 'ResultTypeQueryable' "
+ + "interface.", typeInfo.getTypeException());
+ }
+ typeUsed = true;
+ return this.outputType;
+ }
+
+ /**
+ * Sets the chaining strategy of this {@code StreamTransformation}.
+ */
+ public abstract void setChainingStrategy(StreamOperator.ChainingStrategy strategy);
+
+ /**
+ * Set the buffer timeout of this {@code StreamTransformation}. The timeout is used when
+ * sending elements over the network. The timeout specifies how long a network buffer
+ * should be kept waiting before sending. A higher timeout means that more elements will
+ * be sent in one buffer, this increases throughput. The latency, however, is negatively
+ * affected by a higher timeout.
+ */
+ public void setBufferTimeout(long bufferTimeout) {
+ this.bufferTimeout = bufferTimeout;
+ }
+
+ /**
+ * Returns the buffer timeout of this {@code StreamTransformation}.
+ *
+ * <p>
+ * {@see #setBufferTimeout}
+ */
+ public long getBufferTimeout() {
+ return bufferTimeout;
+ }
+
+ /**
+ * Sets the {@link org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy} of this
+ * {@code StreamTransformation}. The resource strategy is used when scheduling operations on actual
+ * workers when transforming the StreamTopology to an
+ * {@link org.apache.flink.runtime.executiongraph.ExecutionGraph}.
+ */
+ public void setResourceStrategy(StreamGraph.ResourceStrategy resourceStrategy) {
+ this.resourceStrategy = resourceStrategy;
+ }
+
+ /**
+ * Returns the {@code ResourceStrategy} of this {@code StreamTransformation}.
+ *
+ * <p>
+ * {@see #setResourceStrategy}
+ */
+ public StreamGraph.ResourceStrategy getResourceStrategy() {
+ return resourceStrategy;
+ }
+
+ /**
+ * Returns all transitive predecessor {@code StreamTransformation}s of this {@code StreamTransformation}. This
+ * is, for example, used when determining whether a feedback edge of an iteration
+ * actually has the iteration head as a predecessor.
+ *
+ * @return The list of transitive predecessors.
+ */
+ public abstract Collection<StreamTransformation<?>> getTransitivePredecessors();
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" +
+ "id=" + id +
+ ", name='" + name + '\'' +
+ ", outputType=" + outputType +
+ ", parallelism=" + parallelism +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof StreamTransformation)) {
+ return false;
+ }
+
+ StreamTransformation<?> that = (StreamTransformation<?>) o;
+
+ if (bufferTimeout != that.bufferTimeout) {
+ return false;
+ }
+ if (id != that.id) {
+ return false;
+ }
+ if (parallelism != that.parallelism) {
+ return false;
+ }
+ if (!name.equals(that.name)) {
+ return false;
+ }
+ if (outputType != null ? !outputType.equals(that.outputType) : that.outputType != null) {
+ return false;
+ }
+ return resourceStrategy == that.resourceStrategy;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id;
+ result = 31 * result + name.hashCode();
+ result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
+ result = 31 * result + parallelism;
+ result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
+ result = 31 * result + resourceStrategy.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
new file mode 100644
index 0000000..e7273c5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This Transformation represents the application of a
+ * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to two input
+ * {@code StreamTransformations}. The result is again only one stream.
+ *
+ * @param <IN1> The type of the elements in the first input {@code StreamTransformation}
+ * @param <IN2> The type of the elements in the second input {@code StreamTransformation}
+ * @param <OUT> The type of the elements that result from this {@code TwoInputTransformation}
+ */
+public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<OUT> {
+
+ private final StreamTransformation<IN1> input1;
+ private final StreamTransformation<IN2> input2;
+
+ private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
+
+ /**
+ * Creates a new {@code TwoInputTransformation} from the given inputs and operator.
+ *
+ * @param input1 The first input {@code StreamTransformation}
+ * @param input2 The second input {@code StreamTransformation}
+ * @param name The name of the {@code StreamTransformation}, this will be shown in Visualizations and the Log
+ * @param operator The {@code TwoInputStreamOperator}
+ * @param outputType The type of the elements produced by this Transformation
+ * @param parallelism The parallelism of this Transformation
+ */
+ public TwoInputTransformation(
+ StreamTransformation<IN1> input1,
+ StreamTransformation<IN2> input2,
+ String name,
+ TwoInputStreamOperator<IN1, IN2, OUT> operator,
+ TypeInformation<OUT> outputType,
+ int parallelism) {
+ super(name, outputType, parallelism);
+ this.input1 = input1;
+ this.input2 = input2;
+ this.operator = operator;
+ }
+
+ /**
+ * Returns the first input {@code StreamTransformation} of this {@code TwoInputTransformation}.
+ */
+ public StreamTransformation<IN1> getInput1() {
+ return input1;
+ }
+
+ /**
+ * Returns the first input {@code StreamTransformation} of this {@code TwoInputTransformation}.
+ */
+ public StreamTransformation<IN2> getInput2() {
+ return input2;
+ }
+
+ /**
+ * Returns the {@code TypeInformation} for the elements from the first input.
+ */
+ public TypeInformation<IN1> getInputType1() {
+ return input1.getOutputType();
+ }
+
+ /**
+ * Returns the {@code TypeInformation} for the elements from the first input.
+ */
+ public TypeInformation<IN2> getInputType2() {
+ return input2.getOutputType();
+ }
+
+ /**
+ * Returns the {@code TwoInputStreamOperator} of this Transformation.
+ */
+ public TwoInputStreamOperator<IN1, IN2, OUT> getOperator() {
+ return operator;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ result.addAll(input1.getTransitivePredecessors());
+ result.addAll(input2.getTransitivePredecessors());
+ return result;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ operator.setChainingStrategy(strategy);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
new file mode 100644
index 0000000..4fa3c0a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.transformations;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This transformation represents a union of several input
+ * {@link StreamTransformation StreamTransformations}.
+ *
+ * <p>
+ * This does not create a physical operation, it only affects how upstream operations are
+ * connected to downstream operations.
+ *
+ * @param <T> The type of the elements that result from this {@code UnionTransformation}
+ */
+public class UnionTransformation<T> extends StreamTransformation<T> {
+ private final List<StreamTransformation<T>> inputs;
+
+ /**
+ * Creates a new {@code UnionTransformation} from the given input {@code StreamTransformations}.
+ *
+ * <p>
+ * The input {@code StreamTransformations} must all have the same type.
+ *
+ * @param inputs The list of input {@code StreamTransformations}
+ */
+ public UnionTransformation(List<StreamTransformation<T>> inputs) {
+ super("Union", inputs.get(0).getOutputType(), inputs.get(0).getParallelism());
+
+ for (StreamTransformation<T> input: inputs) {
+ if (!input.getOutputType().equals(getOutputType())) {
+ throw new UnsupportedOperationException("Type mismatch in input " + input);
+ }
+ }
+
+ this.inputs = Lists.newArrayList(inputs);
+ }
+
+ /**
+ * Returns the list of input {@code StreamTransformations}.
+ */
+ public List<StreamTransformation<T>> getInputs() {
+ return inputs;
+ }
+
+ @Override
+ public Collection<StreamTransformation<?>> getTransitivePredecessors() {
+ List<StreamTransformation<?>> result = Lists.newArrayList();
+ result.add(this);
+ for (StreamTransformation<T> input: inputs) {
+ result.addAll(input.getTransitivePredecessors());
+ }
+ return result;
+ }
+
+ @Override
+ public final void setChainingStrategy(StreamOperator.ChainingStrategy strategy) {
+ throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
index f51a04f..f3d851c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitioner.java
@@ -23,8 +23,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
* Partitioner that selects all the output channels.
*
- * @param <T>
- * Type of the Tuple
+ * @param <T> Type of the elements in the Stream being broadcast
*/
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
@@ -33,10 +32,6 @@ public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
boolean set;
int setNumber;
- public BroadcastPartitioner() {
- super(PartitioningStrategy.BROADCAST);
- }
-
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
@@ -52,4 +47,14 @@ public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
return returnArray;
}
}
+
+ @Override
+ public StreamPartitioner<T> copy() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "BROADCAST";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index 6c40c03..7bb9480 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -38,7 +38,6 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
KeySelector<T, K> keySelector;
public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
- super(PartitioningStrategy.CUSTOM);
this.partitioner = partitioner;
this.keySelector = keySelector;
}
@@ -58,4 +57,14 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
return returnArray;
}
+
+ @Override
+ public StreamPartitioner<T> copy() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "CUSTOM";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
deleted file mode 100644
index 7026d45..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner that selects the same (one) channel for two Tuples having a
- * specified fields equal.
- *
- * @param <T>
- * Type of the Tuple
- */
-public class FieldsPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[1];
- KeySelector<T, ?> keySelector;
-
- public FieldsPartitioner(KeySelector<T, ?> keySelector) {
- super(PartitioningStrategy.GROUPBY);
- this.keySelector = keySelector;
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- Object key;
- try {
- key = keySelector.getKey(record.getInstance().getValue());
- } catch (Exception e) {
- throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
- }
- returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
-
- return returnArray;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
new file mode 100644
index 0000000..4fb460c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner that forwards elements only to the locally running downstream operation.
+ *
+ * @param <T> Type of the elements in the Stream
+ */
+public class ForwardPartitioner<T> extends StreamPartitioner<T> {
+ private static final long serialVersionUID = 1L;
+
+ private int[] returnArray = new int[] {0};
+
+ @Override
+ public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
+ return returnArray;
+ }
+
+ public StreamPartitioner<T> copy() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "FORWARD";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
index 46b290b..b19fb41 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
@@ -20,19 +20,29 @@ package org.apache.flink.streaming.runtime.partitioner;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-//Group to the partitioner with the lowest id
+/**
+ * Partitioner that sends all elements to the downstream operator with subtask ID=0;
+ *
+ * @param <T> Type of the elements in the Stream being partitioned
+ */
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private int[] returnArray = new int[] { 0 };
- public GlobalPartitioner() {
- super(PartitioningStrategy.GLOBAL);
- }
-
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
return returnArray;
}
+
+ @Override
+ public StreamPartitioner<T> copy() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "GLOBAL";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
new file mode 100644
index 0000000..a3f5158
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Partitioner selects the target channel based on the hash value of a key from a
+ * {@link KeySelector}.
+ *
+ * @param <T> Type of the elements in the Stream being partitioned
+ */
+public class HashPartitioner<T> extends StreamPartitioner<T> {
+ private static final long serialVersionUID = 1L;
+
+ private int[] returnArray = new int[1];
+ KeySelector<T, ?> keySelector;
+
+ public HashPartitioner(KeySelector<T, ?> keySelector) {
+ this.keySelector = keySelector;
+ }
+
+ @Override
+ public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
+ int numberOfOutputChannels) {
+ Object key;
+ try {
+ key = keySelector.getKey(record.getInstance().getValue());
+ } catch (Exception e) {
+ throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
+ }
+ returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
+
+ return returnArray;
+ }
+
+ @Override
+ public StreamPartitioner<T> copy() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "HASH";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index e6ad821..2dfff0e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -24,34 +24,26 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
* Partitioner that distributes the data equally by cycling through the output
* channels.
*
- * @param <T>
- * Type of the Tuple
+ * @param <T> Type of the elements in the Stream being rebalanced
*/
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private int[] returnArray = new int[] {-1};
- private boolean forward;
-
- public RebalancePartitioner(boolean forward) {
- super(forward ? PartitioningStrategy.FORWARD : PartitioningStrategy.DISTRIBUTE);
- this.forward = forward;
- }
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
-
return this.returnArray;
}
public StreamPartitioner<T> copy() {
- return new RebalancePartitioner<T>(forward);
+ return this;
}
@Override
public String toString() {
- return forward ? "ForwardPartitioner" : "RebalancePartitioner";
+ return "REBALANCE";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
index ba50113..93c6f9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
@@ -36,14 +36,20 @@ public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private int[] returnArray = new int[1];
- public ShufflePartitioner() {
- super(PartitioningStrategy.SHUFFLE);
- }
-
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
returnArray[0] = random.nextInt(numberOfOutputChannels);
return returnArray;
}
+
+ @Override
+ public StreamPartitioner<T> copy() {
+ return new ShufflePartitioner<T>();
+ }
+
+ @Override
+ public String toString() {
+ return "SHUFFLE";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
index b37655b..4ef360d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java
@@ -24,30 +24,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
public abstract class StreamPartitioner<T> implements
ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable {
-
- public enum PartitioningStrategy {
-
- FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY, CUSTOM
-
- }
-
private static final long serialVersionUID = 1L;
- private PartitioningStrategy strategy;
-
- public StreamPartitioner(PartitioningStrategy strategy) {
- this.strategy = strategy;
- }
-
- public PartitioningStrategy getStrategy() {
- return strategy;
- }
- public StreamPartitioner<T> copy() {
- return this;
- }
-
- @Override
- public String toString() {
- return this.getClass().getSimpleName();
- }
+ public abstract StreamPartitioner<T> copy();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
index 3ad6b8e..fdf7697 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/ChainedRuntimeContextTest.java
@@ -24,19 +24,23 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.NoOpSink;
+import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
@SuppressWarnings("serial")
-public class ChainedRuntimeContextTest {
- private static final long MEMORYSIZE = 32;
+public class ChainedRuntimeContextTest extends StreamingMultipleProgramsTestBase {
private static RuntimeContext srcContext;
private static RuntimeContext mapContext;
@Test
public void test() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
- env.addSource(new TestSource()).map(new TestMap());
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ env.addSource(new TestSource()).map(new TestMap()).addSink(new NoOpSink<Integer>());
env.execute();
assertNotEquals(srcContext, mapContext);
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index 369b384..7ea1309 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -31,21 +31,21 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class CoStreamTest {
-
- private static final long MEMORY_SIZE = 32;
+public class CoStreamTest extends StreamingMultipleProgramsTestBase {
private static ArrayList<String> expected = new ArrayList<String>();
@Test
public void test() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
TestListResultSink<String> resultSink = new TestListResultSink<String>();
@@ -129,4 +129,4 @@ public class CoStreamTest {
assertEquals(expected, result);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bac21bf5/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 324143f..9775392 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -54,19 +54,20 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
-import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.NoOpSink;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
-public class DataStreamTest {
+public class DataStreamTest extends StreamingMultipleProgramsTestBase {
- private static final long MEMORYSIZE = 32;
- private static int PARALLELISM = 2;
/**
* Tests {@link SingleOutputStreamOperator#name(String)} functionality.
@@ -75,7 +76,7 @@ public class DataStreamTest {
*/
@Test
public void testNaming() throws Exception {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> dataStream1 = env.generateSequence(0, 0).name("testSource1")
.map(new MapFunction<Long, Long>() {
@@ -93,7 +94,7 @@ public class DataStreamTest {
}
}).name("testMap");
- DataStream<Long> connected = dataStream1.connect(dataStream2)
+ DataStreamSink<Long> connected = dataStream1.connect(dataStream2)
.flatMap(new CoFlatMapFunction<Long, Long, Long>() {
@Override
public void flatMap1(Long value, Collector<Long> out) throws Exception {
@@ -110,7 +111,8 @@ public class DataStreamTest {
return null;
}
}).name("testWindowFold")
- .flatten();
+ .flatten()
+ .print();
//test functionality through the operator names in the execution plan
String plan = env.getExecutionPlan();
@@ -130,8 +132,7 @@ public class DataStreamTest {
@Test
@SuppressWarnings("unchecked")
public void testPartitioning() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
- StreamGraph graph = env.getStreamGraph();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream src1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
DataStream src2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
@@ -143,10 +144,15 @@ public class DataStreamTest {
DataStream group3 = src1.groupBy("f0");
DataStream group4 = src1.groupBy(new FirstSelector());
- assertTrue(isPartitioned(graph.getStreamEdge(group1.getId(), createDownStreamId(group1))));
- assertTrue(isPartitioned(graph.getStreamEdge(group2.getId(), createDownStreamId(group2))));
- assertTrue(isPartitioned(graph.getStreamEdge(group3.getId(), createDownStreamId(group3))));
- assertTrue(isPartitioned(graph.getStreamEdge(group4.getId(), createDownStreamId(group4))));
+ int id1 = createDownStreamId(group1);
+ int id2 = createDownStreamId(group2);
+ int id3 = createDownStreamId(group3);
+ int id4 = createDownStreamId(group4);
+
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id1)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id2)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id3)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), id4)));
assertTrue(isGrouped(group1));
assertTrue(isGrouped(group2));
@@ -159,10 +165,15 @@ public class DataStreamTest {
DataStream partition3 = src1.partitionByHash("f0");
DataStream partition4 = src1.partitionByHash(new FirstSelector());
- assertTrue(isPartitioned(graph.getStreamEdge(partition1.getId(), createDownStreamId(partition1))));
- assertTrue(isPartitioned(graph.getStreamEdge(partition2.getId(), createDownStreamId(partition2))));
- assertTrue(isPartitioned(graph.getStreamEdge(partition3.getId(), createDownStreamId(partition3))));
- assertTrue(isPartitioned(graph.getStreamEdge(partition4.getId(), createDownStreamId(partition4))));
+ int pid1 = createDownStreamId(partition1);
+ int pid2 = createDownStreamId(partition2);
+ int pid3 = createDownStreamId(partition3);
+ int pid4 = createDownStreamId(partition4);
+
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid1)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid2)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid3)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), pid4)));
assertFalse(isGrouped(partition1));
assertFalse(isGrouped(partition3));
@@ -181,9 +192,13 @@ public class DataStreamTest {
DataStream customPartition3 = src1.partitionCustom(longPartitioner, "f0");
DataStream customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
- assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition1.getId(), createDownStreamId(customPartition1))));
- assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition3.getId(), createDownStreamId(customPartition3))));
- assertTrue(isCustomPartitioned(graph.getStreamEdge(customPartition4.getId(), createDownStreamId(customPartition4))));
+ int cid1 = createDownStreamId(customPartition1);
+ int cid2 = createDownStreamId(customPartition3);
+ int cid3 = createDownStreamId(customPartition4);
+
+ assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid1)));
+ assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid2)));
+ assertTrue(isCustomPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), cid3)));
assertFalse(isGrouped(customPartition1));
assertFalse(isGrouped(customPartition3));
@@ -205,20 +220,20 @@ public class DataStreamTest {
ConnectedDataStream connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
Integer downStreamId5 = createDownStreamId(connectedGroup5);
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup1.getFirst().getId(), downStreamId1)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup1.getSecond().getId(), downStreamId1)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId1)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup2.getFirst().getId(), downStreamId2)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup2.getSecond().getId(), downStreamId2)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId2)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId2)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup3.getFirst().getId(), downStreamId3)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup3.getSecond().getId(), downStreamId3)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId3)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId3)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup4.getFirst().getId(), downStreamId4)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup4.getSecond().getId(), downStreamId4)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId4)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId4)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup5.getFirst().getId(), downStreamId5)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedGroup5.getSecond().getId(), downStreamId5)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId5)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(), downStreamId5)));
assertTrue(isGrouped(connectedGroup1));
assertTrue(isGrouped(connectedGroup2));
@@ -242,20 +257,30 @@ public class DataStreamTest {
ConnectedDataStream connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition1.getFirst().getId(), connectDownStreamId1)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition1.getSecond().getId(), connectDownStreamId1)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+ connectDownStreamId1)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+ connectDownStreamId1)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition2.getFirst().getId(), connectDownStreamId2)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition2.getSecond().getId(), connectDownStreamId2)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+ connectDownStreamId2)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+ connectDownStreamId2)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition3.getFirst().getId(), connectDownStreamId3)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition3.getSecond().getId(), connectDownStreamId3)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+ connectDownStreamId3)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+ connectDownStreamId3)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition4.getFirst().getId(), connectDownStreamId4)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition4.getSecond().getId(), connectDownStreamId4)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+ connectDownStreamId4)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+ connectDownStreamId4)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition5.getFirst().getId(), connectDownStreamId5)));
- assertTrue(isPartitioned(graph.getStreamEdge(connectedPartition5.getSecond().getId(), connectDownStreamId5)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
+ connectDownStreamId5)));
+ assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src2.getId(),
+ connectDownStreamId5)));
assertFalse(isGrouped(connectedPartition1));
assertFalse(isGrouped(connectedPartition2));
@@ -269,17 +294,17 @@ public class DataStreamTest {
*/
@Test
public void testParallelism() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(10, MEMORYSIZE);
- StreamGraph graph = env.getStreamGraph();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+ env.setParallelism(10);
SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long, Long>, Long>() {
@Override
public Long map(Tuple2<Long, Long> value) throws Exception {
return null;
}
- });
+ }).name("MyMap");
DataStream<Long> windowed = map
.window(Count.of(10))
@@ -288,7 +313,10 @@ public class DataStreamTest {
public Long fold(Long accumulator, Long value) throws Exception {
return null;
}
- }).flatten();
+ })
+ .flatten();
+
+ windowed.addSink(new NoOpSink<Long>());
DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
@Override
@@ -296,16 +324,21 @@ public class DataStreamTest {
}
});
- assertEquals(1, graph.getStreamNode(src.getId()).getParallelism());
- assertEquals(10, graph.getStreamNode(map.getId()).getParallelism());
- assertEquals(10, graph.getStreamNode(windowed.getId()).getParallelism());
- assertEquals(10, graph.getStreamNode(sink.getId()).getParallelism());
+ assertEquals(1, env.getStreamGraph().getStreamNode(src.getId()).getParallelism());
+ assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
+ assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
+ assertEquals(10,
+ env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
env.setParallelism(7);
- assertEquals(1, graph.getStreamNode(src.getId()).getParallelism());
- assertEquals(7, graph.getStreamNode(map.getId()).getParallelism());
- assertEquals(7, graph.getStreamNode(windowed.getId()).getParallelism());
- assertEquals(7, graph.getStreamNode(sink.getId()).getParallelism());
+
+ // Some parts, such as windowing rely on the fact that previous operators have a parallelism
+ // set when instantiating the Discretizer. This would break if we dynamically changed
+ // the parallelism of operations when changing the setting on the Execution Environment.
+ assertEquals(1, env.getStreamGraph().getStreamNode(src.getId()).getParallelism());
+ assertEquals(10, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
+ assertEquals(1, env.getStreamGraph().getStreamNode(windowed.getId()).getParallelism());
+ assertEquals(10, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
try {
src.setParallelism(3);
@@ -314,21 +347,22 @@ public class DataStreamTest {
}
DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
- assertEquals(7, graph.getStreamNode(parallelSource.getId()).getParallelism());
+ parallelSource.addSink(new NoOpSink<Long>());
+ assertEquals(7, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
parallelSource.setParallelism(3);
- assertEquals(3, graph.getStreamNode(parallelSource.getId()).getParallelism());
+ assertEquals(3, env.getStreamGraph().getStreamNode(parallelSource.getId()).getParallelism());
map.setParallelism(2);
- assertEquals(2, graph.getStreamNode(map.getId()).getParallelism());
+ assertEquals(2, env.getStreamGraph().getStreamNode(map.getId()).getParallelism());
sink.setParallelism(4);
- assertEquals(4, graph.getStreamNode(sink.getId()).getParallelism());
+ assertEquals(4, env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getParallelism());
}
@Test
public void testTypeInfo() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> src1 = env.generateSequence(0, 0);
assertEquals(TypeExtractor.getForClass(Long.class), src1.getType());
@@ -366,9 +400,7 @@ public class DataStreamTest {
@Test
public void operatorTest() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-
- StreamGraph streamGraph = env.getStreamGraph();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> src = env.generateSequence(0, 0);
@@ -379,6 +411,7 @@ public class DataStreamTest {
}
};
DataStream<Integer> map = src.map(mapFunction);
+ map.addSink(new NoOpSink<Integer>());
assertEquals(mapFunction, getFunctionForDataStream(map));
@@ -388,6 +421,7 @@ public class DataStreamTest {
}
};
DataStream<Integer> flatMap = src.flatMap(flatMapFunction);
+ flatMap.addSink(new NoOpSink<Integer>());
assertEquals(flatMapFunction, getFunctionForDataStream(flatMap));
FilterFunction<Integer> filterFunction = new FilterFunction<Integer>() {
@@ -401,16 +435,18 @@ public class DataStreamTest {
.union(flatMap)
.filter(filterFunction);
+ unionFilter.addSink(new NoOpSink<Integer>());
+
assertEquals(filterFunction, getFunctionForDataStream(unionFilter));
try {
- streamGraph.getStreamEdge(map.getId(), unionFilter.getId());
+ env.getStreamGraph().getStreamEdge(map.getId(), unionFilter.getId());
} catch (RuntimeException e) {
fail(e.getMessage());
}
try {
- streamGraph.getStreamEdge(flatMap.getId(), unionFilter.getId());
+ env.getStreamGraph().getStreamEdge(flatMap.getId(), unionFilter.getId());
} catch (RuntimeException e) {
fail(e.getMessage());
}
@@ -423,14 +459,15 @@ public class DataStreamTest {
};
SplitDataStream<Integer> split = unionFilter.split(outputSelector);
- List<OutputSelector<?>> outputSelectors = streamGraph.getStreamNode(split.getId()).getOutputSelectors();
+ split.select("dummy").addSink(new NoOpSink<Integer>());
+ List<OutputSelector<?>> outputSelectors = env.getStreamGraph().getStreamNode(unionFilter.getId()).getOutputSelectors();
assertEquals(1, outputSelectors.size());
assertEquals(outputSelector, outputSelectors.get(0));
DataStream<Integer> select = split.select("a");
DataStreamSink<Integer> sink = select.print();
- StreamEdge splitEdge = streamGraph.getStreamEdge(select.getId(), sink.getId());
+ StreamEdge splitEdge = env.getStreamGraph().getStreamEdge(unionFilter.getId(), sink.getTransformation().getId());
assertEquals("a", splitEdge.getSelectedNames().get(0));
ConnectedDataStream<Integer, Integer> connect = map.connect(flatMap);
@@ -446,16 +483,17 @@ public class DataStreamTest {
}
};
DataStream<String> coMap = connect.map(coMapper);
+ coMap.addSink(new NoOpSink<String>());
assertEquals(coMapper, getFunctionForDataStream(coMap));
try {
- streamGraph.getStreamEdge(map.getId(), coMap.getId());
+ env.getStreamGraph().getStreamEdge(map.getId(), coMap.getId());
} catch (RuntimeException e) {
fail(e.getMessage());
}
try {
- streamGraph.getStreamEdge(flatMap.getId(), coMap.getId());
+ env.getStreamGraph().getStreamEdge(flatMap.getId(), coMap.getId());
} catch (RuntimeException e) {
fail(e.getMessage());
}
@@ -463,12 +501,11 @@ public class DataStreamTest {
@Test
public void sinkKeyTest() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
- StreamGraph streamGraph = env.getStreamGraph();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Long> sink = env.generateSequence(1, 100).print();
- assertTrue(streamGraph.getStreamNode(sink.getId()).getStatePartitioner() == null);
- assertTrue(streamGraph.getStreamNode(sink.getId()).getInEdges().get(0).getPartitioner() instanceof RebalancePartitioner);
+ DataStreamSink<Long> sink = env.generateSequence(1, 100).print();
+ assertTrue(env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getStatePartitioner() == null);
+ assertTrue(env.getStreamGraph().getStreamNode(sink.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof ForwardPartitioner);
KeySelector<Long, Long> key1 = new KeySelector<Long, Long>() {
@@ -480,11 +517,11 @@ public class DataStreamTest {
}
};
- DataStream<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
+ DataStreamSink<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
- assertTrue(streamGraph.getStreamNode(sink2.getId()).getStatePartitioner() != null);
- assertEquals(key1, streamGraph.getStreamNode(sink2.getId()).getStatePartitioner());
- assertTrue(streamGraph.getStreamNode(sink2.getId()).getInEdges().get(0).getPartitioner() instanceof FieldsPartitioner);
+ assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner() != null);
+ assertEquals(key1, env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
+ assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
KeySelector<Long, Long> key2 = new KeySelector<Long, Long>() {
@@ -496,49 +533,52 @@ public class DataStreamTest {
}
};
- DataStream<Long> sink3 = env.generateSequence(1, 100).keyBy(key2).print();
+ DataStreamSink<Long> sink3 = env.generateSequence(1, 100).keyBy(key2).print();
- assertTrue(streamGraph.getStreamNode(sink3.getId()).getStatePartitioner() != null);
- assertEquals(key2, streamGraph.getStreamNode(sink3.getId()).getStatePartitioner());
- assertTrue(streamGraph.getStreamNode(sink3.getId()).getInEdges().get(0).getPartitioner() instanceof FieldsPartitioner);
+ assertTrue(env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getStatePartitioner() != null);
+ assertEquals(key2, env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getStatePartitioner());
+ assertTrue(env.getStreamGraph().getStreamNode(sink3.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
}
@Test
public void testChannelSelectors() {
- StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-
- StreamGraph streamGraph = env.getStreamGraph();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> src = env.generateSequence(0, 0);
DataStream<Long> broadcast = src.broadcast();
DataStreamSink<Long> broadcastSink = broadcast.print();
StreamPartitioner<?> broadcastPartitioner =
- streamGraph.getStreamEdge(broadcast.getId(), broadcastSink.getId()).getPartitioner();
+ env.getStreamGraph().getStreamEdge(src.getId(),
+ broadcastSink.getTransformation().getId()).getPartitioner();
assertTrue(broadcastPartitioner instanceof BroadcastPartitioner);
DataStream<Long> shuffle = src.shuffle();
DataStreamSink<Long> shuffleSink = shuffle.print();
StreamPartitioner<?> shufflePartitioner =
- streamGraph.getStreamEdge(shuffle.getId(), shuffleSink.getId()).getPartitioner();
+ env.getStreamGraph().getStreamEdge(src.getId(),
+ shuffleSink.getTransformation().getId()).getPartitioner();
assertTrue(shufflePartitioner instanceof ShufflePartitioner);
DataStream<Long> forward = src.forward();
DataStreamSink<Long> forwardSink = forward.print();
StreamPartitioner<?> forwardPartitioner =
- streamGraph.getStreamEdge(forward.getId(), forwardSink.getId()).getPartitioner();
- assertTrue(forwardPartitioner instanceof RebalancePartitioner);
+ env.getStreamGraph().getStreamEdge(src.getId(),
+ forwardSink.getTransformation().getId()).getPartitioner();
+ assertTrue(forwardPartitioner instanceof ForwardPartitioner);
DataStream<Long> rebalance = src.rebalance();
DataStreamSink<Long> rebalanceSink = rebalance.print();
StreamPartitioner<?> rebalancePartitioner =
- streamGraph.getStreamEdge(rebalance.getId(), rebalanceSink.getId()).getPartitioner();
+ env.getStreamGraph().getStreamEdge(src.getId(),
+ rebalanceSink.getTransformation().getId()).getPartitioner();
assertTrue(rebalancePartitioner instanceof RebalancePartitioner);
DataStream<Long> global = src.global();
DataStreamSink<Long> globalSink = global.print();
StreamPartitioner<?> globalPartitioner =
- streamGraph.getStreamEdge(global.getId(), globalSink.getId()).getPartitioner();
+ env.getStreamGraph().getStreamEdge(src.getId(),
+ globalSink.getTransformation().getId()).getPartitioner();
assertTrue(globalPartitioner instanceof GlobalPartitioner);
}
@@ -559,7 +599,7 @@ public class DataStreamTest {
}
private static Integer createDownStreamId(DataStream dataStream) {
- return dataStream.print().getId();
+ return dataStream.print().getTransformation().getId();
}
private static boolean isGrouped(DataStream dataStream) {
@@ -567,7 +607,7 @@ public class DataStreamTest {
}
private static Integer createDownStreamId(ConnectedDataStream dataStream) {
- return dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
+ SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
@Override
public Object map1(Tuple2<Long, Long> value) {
return null;
@@ -577,7 +617,9 @@ public class DataStreamTest {
public Object map2(Tuple2<Long, Long> value) {
return null;
}
- }).getId();
+ });
+ coMap.addSink(new NoOpSink());
+ return coMap.getId();
}
private static boolean isGrouped(ConnectedDataStream dataStream) {
@@ -585,7 +627,7 @@ public class DataStreamTest {
}
private static boolean isPartitioned(StreamEdge edge) {
- return edge.getPartitioner() instanceof FieldsPartitioner;
+ return edge.getPartitioner() instanceof HashPartitioner;
}
private static boolean isCustomPartitioned(StreamEdge edge) {