You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:40 UTC
[06/13] flink git commit: [FLINK-2550] Rename ConnectedDataStream to
ConnectedStreams, Remove some operations
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
deleted file mode 100644
index dc6ea34..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamReduce<IN1, IN2, OUT>
- extends AbstractUdfStreamOperator<OUT, CoReduceFunction<IN1, IN2, OUT>>
- implements TwoInputStreamOperator<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- protected IN1 currentValue1 = null;
- protected IN2 currentValue2 = null;
-
- // We keep track of watermarks from both inputs, the combined input is the minimum
- // Once the minimum advances we emit a new watermark for downstream operators
- private long combinedWatermark = Long.MIN_VALUE;
- private long input1Watermark = Long.MIN_VALUE;
- private long input2Watermark = Long.MIN_VALUE;
-
- public CoStreamReduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
- super(coReducer);
- currentValue1 = null;
- currentValue2 = null;
- }
-
- @Override
- public void processElement1(StreamRecord<IN1> element) throws Exception {
- if (currentValue1 != null) {
- currentValue1 = userFunction.reduce1(currentValue1, element.getValue());
- } else {
- currentValue1 = element.getValue();
- }
- output.collect(element.replace(userFunction.map1(currentValue1)));
- }
-
- @Override
- public void processElement2(StreamRecord<IN2> element) throws Exception {
- if (currentValue2 != null) {
- currentValue2 = userFunction.reduce2(currentValue2, element.getValue());
- } else {
- currentValue2 = element.getValue();
- }
- output.collect(element.replace(userFunction.map2(currentValue2)));
- }
-
- @Override
- public void processWatermark1(Watermark mark) throws Exception {
- input1Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-
- @Override
- public void processWatermark2(Watermark mark) throws Exception {
- input2Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
deleted file mode 100644
index 4bfe2ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.state.CircularFifoList;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamWindow<IN1, IN2, OUT>
- extends AbstractUdfStreamOperator<OUT, CoWindowFunction<IN1, IN2, OUT>>
- implements TwoInputStreamOperator<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- protected long windowSize;
- protected long slideSize;
- protected CircularFifoList<StreamRecord<IN1>> circularList1;
- protected CircularFifoList<StreamRecord<IN2>> circularList2;
- protected TimestampWrapper<IN1> timeStamp1;
- protected TimestampWrapper<IN2> timeStamp2;
-
- protected StreamWindow window;
-
- protected long startTime;
- protected long nextRecordTime;
-
- // We keep track of watermarks from both inputs, the combined input is the minimum
- // Once the minimum advances we emit a new watermark for downstream operators
- private long combinedWatermark = Long.MIN_VALUE;
- private long input1Watermark = Long.MIN_VALUE;
- private long input2Watermark = Long.MIN_VALUE;
-
- public CoStreamWindow(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
- long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
- super(coWindowFunction);
- this.windowSize = windowSize;
- this.slideSize = slideInterval;
- this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
- this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
- this.timeStamp1 = timeStamp1;
- this.timeStamp2 = timeStamp2;
- this.startTime = timeStamp1.getStartTime();
-
- this.window = new StreamWindow();
- }
-
- @Override
- public void processElement1(StreamRecord<IN1> element) throws Exception {
- window.addToBuffer1(element.getValue());
- }
-
- @Override
- public void processElement2(StreamRecord<IN2> element) throws Exception {
- window.addToBuffer2(element.getValue());
- }
-
- @SuppressWarnings("unchecked")
- protected void callUserFunction() throws Exception {
-
- List<IN1> first = new ArrayList<IN1>();
- List<IN2> second = new ArrayList<IN2>();
-
- // TODO: Give operators a way to copy elements
-
- for (IN1 element : window.circularList1.getElements()) {
- first.add(element);
- }
- for (IN2 element : window.circularList2.getElements()) {
- second.add(element);
- }
-
- TimestampedCollector<OUT> timestampedCollector = new TimestampedCollector<OUT>(output);
- timestampedCollector.setTimestamp(System.currentTimeMillis());
- if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
- userFunction.coWindow(first, second, timestampedCollector);
- }
- }
-
- @Override
- public void processWatermark1(Watermark mark) throws Exception {
- input1Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-
- @Override
- public void processWatermark2(Watermark mark) throws Exception {
- input2Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-
- protected class StreamWindow implements Serializable {
- private static final long serialVersionUID = 1L;
-
- protected int granularity;
- protected int batchPerSlide;
- protected long numberOfBatches;
-
- protected long minibatchCounter;
-
- protected CircularFifoList<IN1> circularList1;
- protected CircularFifoList<IN2> circularList2;
-
- public StreamWindow() {
- this.granularity = (int) MathUtils.gcd(windowSize, slideSize);
- this.batchPerSlide = (int) (slideSize / granularity);
- this.numberOfBatches = windowSize / granularity;
- this.circularList1 = new CircularFifoList<IN1>();
- this.circularList2 = new CircularFifoList<IN2>();
- this.minibatchCounter = 0;
- }
-
- public void addToBuffer1(IN1 nextValue) throws Exception {
- checkWindowEnd(timeStamp1.getTimestamp(nextValue));
- if (minibatchCounter >= 0) {
- circularList1.add(nextValue);
- }
- }
-
- public void addToBuffer2(IN2 nextValue) throws Exception {
- checkWindowEnd(timeStamp2.getTimestamp(nextValue));
- if (minibatchCounter >= 0) {
- circularList2.add(nextValue);
- }
- }
-
- protected synchronized void checkWindowEnd(long timeStamp) throws Exception{
- nextRecordTime = timeStamp;
-
- while (miniBatchEnd()) {
- circularList1.newSlide();
- circularList2.newSlide();
- minibatchCounter++;
- if (windowEnd()) {
- callUserFunction();
- circularList1.shiftWindow(batchPerSlide);
- circularList2.shiftWindow(batchPerSlide);
- }
- }
- }
-
- protected boolean miniBatchEnd() {
- if (nextRecordTime < startTime + granularity) {
- return false;
- } else {
- startTime += granularity;
- return true;
- }
- }
-
- public boolean windowEnd() {
- if (minibatchCounter == numberOfBatches) {
- minibatchCounter -= batchPerSlide;
- return true;
- }
- return false;
- }
-
- public void reduceLastBatch() throws Exception{
- if (!miniBatchEnd()) {
- callUserFunction();
- }
- }
-
- public Iterable<IN1> getIterable1() {
- return circularList1.getIterable();
- }
-
- public Iterable<IN2> getIterable2() {
- return circularList2.getIterable();
- }
-
- @Override
- public String toString() {
- return circularList1.toString();
- }
-
- }
-
- @Override
- public void close() throws Exception {
- if (!window.miniBatchEnd()) {
- try {
- callUserFunction();
- } catch (Exception e) {
- throw new RuntimeException("Could not call user function in CoStreamWindow.close()", e);
- }
- }
- super.close();
- }
-
- public void setSlideSize(long slideSize) {
- this.slideSize = slideSize;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 9775392..337d97b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
+import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -62,7 +62,6 @@ import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -127,7 +126,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
/**
* Tests that {@link DataStream#groupBy} and {@link DataStream#partitionByHash} result in
- * different and correct topologies. Does the some for the {@link ConnectedDataStream}.
+ * different and correct topologies. Does the some for the {@link ConnectedStreams}.
*/
@Test
@SuppressWarnings("unchecked")
@@ -136,7 +135,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
DataStream src1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
DataStream src2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
- ConnectedDataStream connected = src1.connect(src2);
+ ConnectedStreams connected = src1.connect(src2);
//Testing DataStream grouping
DataStream group1 = src1.groupBy(0);
@@ -204,20 +203,20 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertFalse(isGrouped(customPartition3));
assertFalse(isGrouped(customPartition4));
- //Testing ConnectedDataStream grouping
- ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0);
+ //Testing ConnectedStreams grouping
+ ConnectedStreams connectedGroup1 = connected.groupBy(0, 0);
Integer downStreamId1 = createDownStreamId(connectedGroup1);
- ConnectedDataStream connectedGroup2 = connected.groupBy(new int[]{0}, new int[]{0});
+ ConnectedStreams connectedGroup2 = connected.groupBy(new int[]{0}, new int[]{0});
Integer downStreamId2 = createDownStreamId(connectedGroup2);
- ConnectedDataStream connectedGroup3 = connected.groupBy("f0", "f0");
+ ConnectedStreams connectedGroup3 = connected.groupBy("f0", "f0");
Integer downStreamId3 = createDownStreamId(connectedGroup3);
- ConnectedDataStream connectedGroup4 = connected.groupBy(new String[]{"f0"}, new String[]{"f0"});
+ ConnectedStreams connectedGroup4 = connected.groupBy(new String[]{"f0"}, new String[]{"f0"});
Integer downStreamId4 = createDownStreamId(connectedGroup4);
- ConnectedDataStream connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
+ ConnectedStreams connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
Integer downStreamId5 = createDownStreamId(connectedGroup5);
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
@@ -241,20 +240,20 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isGrouped(connectedGroup4));
assertTrue(isGrouped(connectedGroup5));
- //Testing ConnectedDataStream partitioning
- ConnectedDataStream connectedPartition1 = connected.partitionByHash(0, 0);
+ //Testing ConnectedStreams partitioning
+ ConnectedStreams connectedPartition1 = connected.partitionByHash(0, 0);
Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
- ConnectedDataStream connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
+ ConnectedStreams connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
- ConnectedDataStream connectedPartition3 = connected.partitionByHash("f0", "f0");
+ ConnectedStreams connectedPartition3 = connected.partitionByHash("f0", "f0");
Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
- ConnectedDataStream connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
+ ConnectedStreams connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
- ConnectedDataStream connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
+ ConnectedStreams connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
@@ -470,7 +469,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
StreamEdge splitEdge = env.getStreamGraph().getStreamEdge(unionFilter.getId(), sink.getTransformation().getId());
assertEquals("a", splitEdge.getSelectedNames().get(0));
- ConnectedDataStream<Integer, Integer> connect = map.connect(flatMap);
+ ConnectedStreams<Integer, Integer> connect = map.connect(flatMap);
CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() {
@Override
public String map1(Integer value) {
@@ -606,7 +605,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
return dataStream instanceof GroupedDataStream;
}
- private static Integer createDownStreamId(ConnectedDataStream dataStream) {
+ private static Integer createDownStreamId(ConnectedStreams dataStream) {
SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
@Override
public Object map1(Tuple2<Long, Long> value) {
@@ -622,8 +621,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
return coMap.getId();
}
- private static boolean isGrouped(ConnectedDataStream dataStream) {
- return (dataStream.getFirst() instanceof GroupedDataStream && dataStream.getSecond() instanceof GroupedDataStream);
+ private static boolean isGrouped(ConnectedStreams dataStream) {
+ return (dataStream.getFirstInput() instanceof GroupedDataStream && dataStream.getSecondInput() instanceof GroupedDataStream);
}
private static boolean isPartitioned(StreamEdge edge) {
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 285ee57..774f58d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
@@ -31,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -112,7 +112,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
- ConnectedIterativeDataStream<Integer, Integer> coIter = source.iterate().withFeedbackType(
+ ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
Integer.class);
@@ -151,7 +151,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
IterativeDataStream<Integer> iter1 = source.iterate();
- ConnectedIterativeDataStream<Integer, Integer> coIter = source.iterate().withFeedbackType(
+ ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
Integer.class);
@@ -181,7 +181,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
IterativeDataStream<Integer> iter1 = source.iterate();
// Calling withFeedbackType should create a new iteration
- ConnectedIterativeDataStream<Integer, String> iter2 = iter1.withFeedbackType(String.class);
+ ConnectedIterativeDataStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
iter1.closeWith(iter1.map(NoOpIntMap)).print();
iter2.closeWith(iter2.map(NoOpCoMap)).print();
@@ -395,7 +395,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
.map(NoOpStrMap).name("ParallelizeMap");
- ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0)
+ ConnectedIterativeDataStreams<Integer, String> coIt = env.fromElements(0, 0)
.map(NoOpIntMap).name("ParallelizeMap")
.iterate(2000)
.withFeedbackType("String");
@@ -403,7 +403,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
try {
coIt.groupBy(1, 2);
fail();
- } catch (UnsupportedOperationException e) {
+ } catch (InvalidProgramException e) {
// this is expected
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 0989128..4c0f59f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -30,8 +29,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
@@ -72,12 +69,6 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase {
fail();
} catch (Exception e) {
}
- try {
- source.connect(source).windowReduce(new TestCoWindow<Long, Long, String>(), 10, 100)
- .print();
- fail();
- } catch (Exception e) {
- }
env.addSource(new TestSource<Integer>()).returns("Integer");
source.map(new TestMap<Long, Long>()).returns(Long.class).print();
@@ -86,9 +77,6 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase {
source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
.returns(BasicTypeInfo.INT_TYPE_INFO).print();
- source.connect(source).windowReduce(new TestCoWindow<Long, Long, String>(), 10, 100)
- .returns("String").print();
-
assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
source.map(new TestMap<Long, Long>()).returns(Long.class).getType());
@@ -161,38 +149,4 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase {
}
}
-
- private class TestCoReduce<IN1, IN2, OUT> implements CoReduceFunction<IN1, IN2, OUT> {
-
- @Override
- public IN1 reduce1(IN1 value1, IN1 value2) {
- return null;
- }
-
- @Override
- public IN2 reduce2(IN2 value1, IN2 value2) {
- return null;
- }
-
- @Override
- public OUT map1(IN1 value) {
- return null;
- }
-
- @Override
- public OUT map2(IN2 value) {
- return null;
- }
-
- }
-
- private class TestCoWindow<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
-
- @Override
- public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out)
- throws Exception {
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index 508f1a2..0137682 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -115,23 +115,6 @@ public class WindowCrossJoinTest extends StreamingMultipleProgramsTestBase {
.map(new ResultMap())
.addSink(joinResultSink);
- inStream1
- .cross(inStream2)
- .onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
- new MyTimestamp<Tuple1<Integer>>(), 100)
- .with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> cross(
- Tuple2<Integer, String> val1, Tuple1<Integer> val2) throws Exception {
- return new Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>(val1, val2);
- }
- })
- .map(new ResultMap())
- .addSink(crossResultSink);
-
env.execute();
assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinExpectedResults),
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 3b05274..2246ffd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
+import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -220,7 +220,7 @@ public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase
DataStream<Integer> source1 = env.fromElements(1, 10);
DataStream<Integer> source2 = env.fromElements(2, 11);
- ConnectedDataStream<Integer, Integer> connectedSource = source1.connect(source2);
+ ConnectedStreams<Integer, Integer> connectedSource = source1.connect(source2);
OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs();
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
deleted file mode 100644
index a1e9f74..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction}
-import org.apache.flink.util.Collector
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
-
- /**
- * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
- * the output to a common type. The transformation calls a
- * @param fun1 for each element of the first input and
- * @param fun2 for each element of the second input. Each
- * CoMapFunction call returns exactly one element.
- *
- * The CoMapFunction used to jointly transform the two input
- * DataStreams
- * @return The transformed { @link DataStream}
- */
- def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R):
- DataStream[R] = {
- if (fun1 == null || fun2 == null) {
- throw new NullPointerException("Map function must not be null.")
- }
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
- val comapper = new CoMapFunction[IN1, IN2, R] {
- def map1(in1: IN1): R = cleanFun1(in1)
- def map2(in2: IN2): R = cleanFun2(in2)
- }
-
- map(comapper)
- }
-
- /**
- * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
- * the output to a common type. The transformation calls a
- * {@link CoMapFunction#map1} for each element of the first input and
- * {@link CoMapFunction#map2} for each element of the second input. Each
- * CoMapFunction call returns exactly one element. The user can also extend
- * {@link RichCoMapFunction} to gain access to other features provided by
- * the {@link RichFuntion} interface.
- *
- * @param coMapper
- * The CoMapFunction used to jointly transform the two input
- * DataStreams
- * @return The transformed { @link DataStream}
- */
- def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]):
- DataStream[R] = {
- if (coMapper == null) {
- throw new NullPointerException("Map function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
- * maps the output to a common type. The transformation calls a
- * {@link CoFlatMapFunction#flatMap1} for each element of the first input
- * and {@link CoFlatMapFunction#flatMap2} for each element of the second
- * input. Each CoFlatMapFunction call returns any number of elements
- * including none. The user can also extend {@link RichFlatMapFunction} to
- * gain access to other features provided by the {@link RichFuntion}
- * interface.
- *
- * @param coFlatMapper
- * The CoFlatMapFunction used to jointly transform the two input
- * DataStreams
- * @return The transformed { @link DataStream}
- */
- def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
- DataStream[R] = {
- if (coFlatMapper == null) {
- throw new NullPointerException("FlatMap function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
- * maps the output to a common type. The transformation calls a
- * @param fun1 for each element of the first input
- * and @param fun2 for each element of the second
- * input. Each CoFlatMapFunction call returns any number of elements
- * including none.
- *
- * @return The transformed { @link DataStream}
- */
- def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit,
- fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
- if (fun1 == null || fun2 == null) {
- throw new NullPointerException("FlatMap functions must not be null.")
- }
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
- val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
- def flatMap1(value: IN1, out: Collector[R]): Unit = cleanFun1(value, out)
- def flatMap2(value: IN2, out: Collector[R]): Unit = cleanFun2(value, out)
- }
- flatMap(flatMapper)
- }
-
- /**
- * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
- * maps the output to a common type. The transformation calls a
- * @param fun1 for each element of the first input
- * and @param fun2 for each element of the second
- * input. Each CoFlatMapFunction call returns any number of elements
- * including none.
- *
- * @return The transformed { @link DataStream}
- */
- def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
- fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
- if (fun1 == null || fun2 == null) {
- throw new NullPointerException("FlatMap functions must not be null.")
- }
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
- val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
- def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
- def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
- }
- flatMap(flatMapper)
- }
-
- /**
- * GroupBy operation for connected data stream. Groups the elements of
- * input1 and input2 according to keyPosition1 and keyPosition2. Used for
- * applying function on grouped data streams for example
- * {@link ConnectedDataStream#reduce}
- *
- * @param keyPosition1
- * The field used to compute the hashcode of the elements in the
- * first input stream.
- * @param keyPosition2
- * The field used to compute the hashcode of the elements in the
- * second input stream.
- * @return @return The transformed { @link ConnectedDataStream}
- */
- def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
- javaStream.groupBy(keyPosition1, keyPosition2)
- }
-
- /**
- * GroupBy operation for connected data stream. Groups the elements of
- * input1 and input2 according to keyPositions1 and keyPositions2. Used for
- * applying function on grouped data streams for example
- * {@link ConnectedDataStream#reduce}
- *
- * @param keyPositions1
- * The fields used to group the first input stream.
- * @param keyPositions2
- * The fields used to group the second input stream.
- * @return @return The transformed { @link ConnectedDataStream}
- */
- def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
- ConnectedDataStream[IN1, IN2] = {
- javaStream.groupBy(keyPositions1, keyPositions2)
- }
-
- /**
- * GroupBy operation for connected data stream using key expressions. Groups
- * the elements of input1 and input2 according to field1 and field2. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field1
- * The grouping expression for the first input
- * @param field2
- * The grouping expression for the second input
- * @return The grouped { @link ConnectedDataStream}
- */
- def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
- javaStream.groupBy(field1, field2)
- }
-
- /**
- * GroupBy operation for connected data stream using key expressions. Groups
- * the elements of input1 and input2 according to fields1 and fields2. A
- * field expression is either the name of a public field or a getter method
- * with parentheses of the {@link DataStream}S underlying type. A dot can be
- * used to drill down into objects, as in {@code "field1.getInnerField2()" }
- * .
- *
- * @param fields1
- * The grouping expressions for the first input
- * @param fields2
- * The grouping expressions for the second input
- * @return The grouped { @link ConnectedDataStream}
- */
- def groupBy(fields1: Array[String], fields2: Array[String]):
- ConnectedDataStream[IN1, IN2] = {
- javaStream.groupBy(fields1, fields2)
- }
-
- /**
- * GroupBy operation for connected data stream. Groups the elements of
- * input1 and input2 using fun1 and fun2. Used for applying
- * function on grouped data streams for example
- * {@link ConnectedDataStream#reduce}
- *
- * @param fun1
- * The function used for grouping the first input
- * @param fun2
- * The function used for grouping the second input
- * @return The grouped { @link ConnectedDataStream}
- */
- def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
- ConnectedDataStream[IN1, IN2] = {
-
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
- val keyExtractor1 = new KeySelector[IN1, K] {
- def getKey(in: IN1) = cleanFun1(in)
- }
- val keyExtractor2 = new KeySelector[IN2, L] {
- def getKey(in: IN2) = cleanFun2(in)
- }
-
- javaStream.groupBy(keyExtractor1, keyExtractor2)
- }
-
- /**
- * PartitionBy operation for connected data stream. Partitions the elements of
- * input1 and input2 according to keyPosition1 and keyPosition2.
- *
- * @param keyPosition1
- * The field used to compute the hashcode of the elements in the
- * first input stream.
- * @param keyPosition2
- * The field used to compute the hashcode of the elements in the
- * second input stream.
- * @return The transformed { @link ConnectedDataStream}
- */
- def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
- javaStream.partitionByHash(keyPosition1, keyPosition2)
- }
-
- /**
- * PartitionBy operation for connected data stream. Partitions the elements of
- * input1 and input2 according to keyPositions1 and keyPositions2.
- *
- * @param keyPositions1
- * The fields used to partition the first input stream.
- * @param keyPositions2
- * The fields used to partition the second input stream.
- * @return The transformed { @link ConnectedDataStream}
- */
- def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
- ConnectedDataStream[IN1, IN2] = {
- javaStream.partitionByHash(keyPositions1, keyPositions2)
- }
-
- /**
- * PartitionBy operation for connected data stream using key expressions. Partitions
- * the elements of input1 and input2 according to field1 and field2. A field
- * expression is either the name of a public field or a getter method with
- * parentheses of the {@link DataStream}S underlying type. A dot can be used
- * to drill down into objects, as in {@code "field1.getInnerField2()" }.
- *
- * @param field1
- * The partitioning expression for the first input
- * @param field2
- * The partitioning expression for the second input
- * @return The grouped { @link ConnectedDataStream}
- */
- def partitionByHash(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
- javaStream.partitionByHash(field1, field2)
- }
-
- /**
- * PartitionBy operation for connected data stream using key expressions. Partitions
- * the elements of input1 and input2 according to fields1 and fields2.
- *
- * @param fields1
- * The partitioning expressions for the first input
- * @param fields2
- * The partitioning expressions for the second input
- * @return The partitioned { @link ConnectedDataStream}
- */
- def partitionByHash(fields1: Array[String], fields2: Array[String]):
- ConnectedDataStream[IN1, IN2] = {
- javaStream.partitionByHash(fields1, fields2)
- }
-
- /**
- * PartitionBy operation for connected data stream. Partitions the elements of
- * input1 and input2 using fun1 and fun2.
- *
- * @param fun1
- * The function used for partitioning the first input
- * @param fun2
- * The function used for partitioning the second input
- * @return The partitioned { @link ConnectedDataStream}
- */
- def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
- ConnectedDataStream[IN1, IN2] = {
-
- val cleanFun1 = clean(fun1)
- val cleanFun2 = clean(fun2)
-
- val keyExtractor1 = new KeySelector[IN1, K] {
- def getKey(in: IN1) = cleanFun1(in)
- }
- val keyExtractor2 = new KeySelector[IN2, L] {
- def getKey(in: IN2) = cleanFun2(in)
- }
-
- javaStream.partitionByHash(keyExtractor1, keyExtractor2)
- }
-
- /**
- * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
- * the outputs to a common type. If the {@link ConnectedDataStream} is
- * batched or windowed then the reduce transformation is applied on every
- * sliding batch/window of the data stream. If the connected data stream is
- * grouped then the reducer is applied on every group of elements sharing
- * the same key. This type of reduce is much faster than reduceGroup since
- * the reduce function can be applied incrementally.
- *
- * @param coReducer
- * The { @link CoReduceFunction} that will be called for every
- * element of the inputs.
- * @return The transformed { @link DataStream}.
- */
- def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]):
- DataStream[R] = {
- if (coReducer == null) {
- throw new NullPointerException("Reduce function must not be null.")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- javaStream.reduce(coReducer).returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
- * the outputs to a common type. If the {@link ConnectedDataStream} is
- * batched or windowed then the reduce transformation is applied on every
- * sliding batch/window of the data stream. If the connected data stream is
- * grouped then the reducer is applied on every group of elements sharing
- * the same key. This type of reduce is much faster than reduceGroup since
- * the reduce function can be applied incrementally.
- *
- * @return The transformed { @link DataStream}.
- */
- def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1,
- reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = {
- if (mapper1 == null || mapper2 == null) {
- throw new NullPointerException("Map functions must not be null.")
- }
- if (reducer1 == null || reducer2 == null) {
- throw new NullPointerException("Reduce functions must not be null.")
- }
-
- val cleanReducer1 = clean(reducer1)
- val cleanReducer2 = clean(reducer2)
- val cleanMapper1 = clean(mapper1)
- val cleanMapper2 = clean(mapper2)
-
- val reducer = new CoReduceFunction[IN1, IN2, R] {
- def reduce1(value1: IN1, value2: IN1): IN1 = cleanReducer1(value1, value2)
- def reduce2(value1: IN2, value2: IN2): IN2 = cleanReducer2(value1, value2)
- def map1(value: IN1): R = cleanMapper1(value)
- def map2(value: IN2): R = cleanMapper2(value)
- }
- reduce(reducer)
- }
-
- /**
- * Applies a CoWindow transformation on the connected DataStreams. The
- * transformation calls the {@link CoWindowFunction#coWindow} method for for
- * time aligned windows of the two data streams. System time is used as
- * default to compute windows.
- *
- * @param coWindowFunction
- * The { @link CoWindowFunction} that will be applied for the time
- * windows.
- * @param windowSize
- * Size of the windows that will be aligned for both streams in
- * milliseconds.
- * @param slideInterval
- * After every function call the windows will be slid by this
- * interval.
- *
- * @return The transformed { @link DataStream}.
- */
- def windowReduce[R: TypeInformation: ClassTag](coWindowFunction:
- CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long):
- DataStream[R] = {
- if (coWindowFunction == null) {
- throw new NullPointerException("CoWindow function must no be null")
- }
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-
- javaStream.windowReduce(coWindowFunction, windowSize, slideInterval).
- returns(outType).asInstanceOf[JavaStream[R]]
- }
-
- /**
- * Applies a CoWindow transformation on the connected DataStreams. The
- * transformation calls the {@link CoWindowFunction#coWindow} method for for
- * time aligned windows of the two data streams. System time is used as
- * default to compute windows.
- *
- * @param coWindower
- * The coWindowing function to be applied for the time windows.
- * @param windowSize
- * Size of the windows that will be aligned for both streams in
- * milliseconds.
- * @param slideInterval
- * After every function call the windows will be slid by this
- * interval.
- *
- * @return The transformed { @link DataStream}.
- */
- def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2],
- Collector[R]) => Unit, windowSize: Long, slideInterval: Long):
- DataStream[R] = {
- if (coWindower == null) {
- throw new NullPointerException("CoWindow function must no be null")
- }
-
- val cleanCoWindower = clean(coWindower)
-
- val coWindowFun = new CoWindowFunction[IN1, IN2, R] {
- def coWindow(first: util.List[IN1], second: util.List[IN2],
- out: Collector[R]): Unit = cleanCoWindower(first.asScala, second.asScala, out)
- }
-
- windowReduce(coWindowFun, windowSize, slideInterval)
- }
-
- /**
- * Returns the first {@link DataStream}.
- *
- * @return The first DataStream.
- */
- def getFirst(): DataStream[IN1] = {
- javaStream.getFirst
- }
-
- /**
- * Returns the second {@link DataStream}.
- *
- * @return The second DataStream.
- */
- def getSecond(): DataStream[IN2] = {
- javaStream.getSecond
- }
-
- /**
- * Gets the type of the first input
- *
- * @return The type of the first input
- */
- def getInputType1(): TypeInformation[IN1] = {
- javaStream.getType1
- }
-
- /**
- * Gets the type of the second input
- *
- * @return The type of the second input
- */
- def getInputType2(): TypeInformation[IN2] = {
- javaStream.getType2
- }
-
- /**
- * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
- * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
- */
- private[flink] def clean[F <: AnyRef](f: F): F = {
- new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
new file mode 100644
index 0000000..41c1a7a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
+import org.apache.flink.util.Collector
+import scala.reflect.ClassTag
+
+/**
+ * [[ConnectedStreams]] represents two connected streams of (possible) different data types. It
+ * can be used to apply transformations such as [[CoMapFunction]] on two
+ * [[DataStream]]s.
+ */
+class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
+
+ /**
+ * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+ * the output to a common type. The transformation calls a
+ * @param fun1 for each element of the first input and
+ * @param fun2 for each element of the second input. Each
+ * CoMapFunction call returns exactly one element.
+ *
+ * The CoMapFunction used to jointly transform the two input
+ * DataStreams
+ * @return The transformed { @link DataStream}
+ */
+ def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R):
+ DataStream[R] = {
+ if (fun1 == null || fun2 == null) {
+ throw new NullPointerException("Map function must not be null.")
+ }
+ val cleanFun1 = clean(fun1)
+ val cleanFun2 = clean(fun2)
+ val comapper = new CoMapFunction[IN1, IN2, R] {
+ def map1(in1: IN1): R = cleanFun1(in1)
+ def map2(in2: IN2): R = cleanFun2(in2)
+ }
+
+ map(comapper)
+ }
+
+ /**
+ * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+ * the output to a common type. The transformation calls a
+ * {@link CoMapFunction#map1} for each element of the first input and
+ * {@link CoMapFunction#map2} for each element of the second input. Each
+ * CoMapFunction call returns exactly one element. The user can also extend
+ * {@link RichCoMapFunction} to gain access to other features provided by
+ * the {@link RichFuntion} interface.
+ *
+ * @param coMapper
+ * The CoMapFunction used to jointly transform the two input
+ * DataStreams
+ * @return The transformed { @link DataStream}
+ */
+ def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]):
+ DataStream[R] = {
+ if (coMapper == null) {
+ throw new NullPointerException("Map function must not be null.")
+ }
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+ javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
+ }
+
+ /**
+ * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+ * maps the output to a common type. The transformation calls a
+ * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+ * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+ * input. Each CoFlatMapFunction call returns any number of elements
+ * including none. The user can also extend {@link RichFlatMapFunction} to
+ * gain access to other features provided by the {@link RichFuntion}
+ * interface.
+ *
+ * @param coFlatMapper
+ * The CoFlatMapFunction used to jointly transform the two input
+ * DataStreams
+ * @return The transformed { @link DataStream}
+ */
+ def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
+ DataStream[R] = {
+ if (coFlatMapper == null) {
+ throw new NullPointerException("FlatMap function must not be null.")
+ }
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+ javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
+ }
+
+ /**
+ * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+ * maps the output to a common type. The transformation calls a
+ * @param fun1 for each element of the first input
+ * and @param fun2 for each element of the second
+ * input. Each CoFlatMapFunction call returns any number of elements
+ * including none.
+ *
+ * @return The transformed { @link DataStream}
+ */
+ def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit,
+ fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
+ if (fun1 == null || fun2 == null) {
+ throw new NullPointerException("FlatMap functions must not be null.")
+ }
+ val cleanFun1 = clean(fun1)
+ val cleanFun2 = clean(fun2)
+ val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+ def flatMap1(value: IN1, out: Collector[R]): Unit = cleanFun1(value, out)
+ def flatMap2(value: IN2, out: Collector[R]): Unit = cleanFun2(value, out)
+ }
+ flatMap(flatMapper)
+ }
+
+ /**
+ * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+ * maps the output to a common type. The transformation calls a
+ * @param fun1 for each element of the first input
+ * and @param fun2 for each element of the second
+ * input. Each CoFlatMapFunction call returns any number of elements
+ * including none.
+ *
+ * @return The transformed { @link DataStream}
+ */
+ def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
+ fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
+ if (fun1 == null || fun2 == null) {
+ throw new NullPointerException("FlatMap functions must not be null.")
+ }
+ val cleanFun1 = clean(fun1)
+ val cleanFun2 = clean(fun2)
+ val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+ def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
+ def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
+ }
+ flatMap(flatMapper)
+ }
+
+ /**
+ * GroupBy operation for connected data stream. Groups the elements of
+ * input1 and input2 according to keyPosition1 and keyPosition2. Used for
+ * applying function on grouped data streams for example
+ * {@link ConnectedStreams#reduce}
+ *
+ * @param keyPosition1
+ * The field used to compute the hashcode of the elements in the
+ * first input stream.
+ * @param keyPosition2
+ * The field used to compute the hashcode of the elements in the
+ * second input stream.
+ * @return @return The transformed { @link ConnectedStreams}
+ */
+ def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
+ javaStream.groupBy(keyPosition1, keyPosition2)
+ }
+
+ /**
+ * GroupBy operation for connected data stream. Groups the elements of
+ * input1 and input2 according to keyPositions1 and keyPositions2. Used for
+ * applying function on grouped data streams for example
+ * {@link ConnectedStreams#reduce}
+ *
+ * @param keyPositions1
+ * The fields used to group the first input stream.
+ * @param keyPositions2
+ * The fields used to group the second input stream.
+ * @return @return The transformed { @link ConnectedStreams}
+ */
+ def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+ ConnectedStreams[IN1, IN2] = {
+ javaStream.groupBy(keyPositions1, keyPositions2)
+ }
+
+ /**
+ * GroupBy operation for connected data stream using key expressions. Groups
+ * the elements of input1 and input2 according to field1 and field2. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field1
+ * The grouping expression for the first input
+ * @param field2
+ * The grouping expression for the second input
+ * @return The grouped { @link ConnectedStreams}
+ */
+ def groupBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
+ javaStream.groupBy(field1, field2)
+ }
+
+ /**
+ * GroupBy operation for connected data stream using key expressions. Groups
+ * the elements of input1 and input2 according to fields1 and fields2. A
+ * field expression is either the name of a public field or a getter method
+ * with parentheses of the {@link DataStream}S underlying type. A dot can be
+ * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+ * .
+ *
+ * @param fields1
+ * The grouping expressions for the first input
+ * @param fields2
+ * The grouping expressions for the second input
+ * @return The grouped { @link ConnectedStreams}
+ */
+ def groupBy(fields1: Array[String], fields2: Array[String]):
+ ConnectedStreams[IN1, IN2] = {
+ javaStream.groupBy(fields1, fields2)
+ }
+
+ /**
+ * GroupBy operation for connected data stream. Groups the elements of
+ * input1 and input2 using fun1 and fun2. Used for applying
+ * function on grouped data streams for example
+ * {@link ConnectedStreams#reduce}
+ *
+ * @param fun1
+ * The function used for grouping the first input
+ * @param fun2
+ * The function used for grouping the second input
+ * @return The grouped { @link ConnectedStreams}
+ */
+ def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+ ConnectedStreams[IN1, IN2] = {
+
+ val cleanFun1 = clean(fun1)
+ val cleanFun2 = clean(fun2)
+ val keyExtractor1 = new KeySelector[IN1, K] {
+ def getKey(in: IN1) = cleanFun1(in)
+ }
+ val keyExtractor2 = new KeySelector[IN2, L] {
+ def getKey(in: IN2) = cleanFun2(in)
+ }
+
+ javaStream.groupBy(keyExtractor1, keyExtractor2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 according to keyPosition1 and keyPosition2.
+ *
+ * @param keyPosition1
+ * The field used to compute the hashcode of the elements in the
+ * first input stream.
+ * @param keyPosition2
+ * The field used to compute the hashcode of the elements in the
+ * second input stream.
+ * @return The transformed { @link ConnectedStreams}
+ */
+ def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
+ javaStream.partitionByHash(keyPosition1, keyPosition2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 according to keyPositions1 and keyPositions2.
+ *
+ * @param keyPositions1
+ * The fields used to partition the first input stream.
+ * @param keyPositions2
+ * The fields used to partition the second input stream.
+ * @return The transformed { @link ConnectedStreams}
+ */
+ def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+ ConnectedStreams[IN1, IN2] = {
+ javaStream.partitionByHash(keyPositions1, keyPositions2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream using key expressions. Partitions
+ * the elements of input1 and input2 according to field1 and field2. A field
+ * expression is either the name of a public field or a getter method with
+ * parentheses of the {@link DataStream}S underlying type. A dot can be used
+ * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+ *
+ * @param field1
+ * The partitioning expression for the first input
+ * @param field2
+ * The partitioning expression for the second input
+ * @return The grouped { @link ConnectedStreams}
+ */
+ def partitionByHash(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
+ javaStream.partitionByHash(field1, field2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream using key expressions. Partitions
+ * the elements of input1 and input2 according to fields1 and fields2.
+ *
+ * @param fields1
+ * The partitioning expressions for the first input
+ * @param fields2
+ * The partitioning expressions for the second input
+ * @return The partitioned { @link ConnectedStreams}
+ */
+ def partitionByHash(fields1: Array[String], fields2: Array[String]):
+ ConnectedStreams[IN1, IN2] = {
+ javaStream.partitionByHash(fields1, fields2)
+ }
+
+ /**
+ * PartitionBy operation for connected data stream. Partitions the elements of
+ * input1 and input2 using fun1 and fun2.
+ *
+ * @param fun1
+ * The function used for partitioning the first input
+ * @param fun2
+ * The function used for partitioning the second input
+ * @return The partitioned { @link ConnectedStreams}
+ */
+ def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+ ConnectedStreams[IN1, IN2] = {
+
+ val cleanFun1 = clean(fun1)
+ val cleanFun2 = clean(fun2)
+
+ val keyExtractor1 = new KeySelector[IN1, K] {
+ def getKey(in: IN1) = cleanFun1(in)
+ }
+ val keyExtractor2 = new KeySelector[IN2, L] {
+ def getKey(in: IN2) = cleanFun2(in)
+ }
+
+ javaStream.partitionByHash(keyExtractor1, keyExtractor2)
+ }
+
+ /**
+ * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+ * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+ */
+ private[flink] def clean[F <: AnyRef](f: F): F = {
+ new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index ca5fc48..c9aee61 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -203,11 +203,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
javaStream.union(dataStreams.map(_.getJavaStream): _*)
/**
- * Creates a new ConnectedDataStream by connecting
+ * Creates a new ConnectedStreams by connecting
* DataStream outputs of different type with each other. The
* DataStreams connected using this operators can be used with CoFunctions.
*/
- def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] =
+ def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
javaStream.connect(dataStream.getJavaStream)
@@ -408,7 +408,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* stream of the iterative part.
*
* The input stream of the iterate operator and the feedback stream will be treated
- * as a ConnectedDataStream where the the input is connected with the feedback stream.
+ * as a ConnectedStreams where the the input is connected with the feedback stream.
*
* This allows the user to distinguish standard input from feedback inputs.
*
@@ -420,7 +420,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* to 0 then the iteration sources will indefinitely, so the job must be killed to stop.
*
*/
- def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedDataStream[T, F] =>
+ def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] =>
(DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
val connectedIterativeStream = javaStream.iterate(maxWaitTimeMillis).
@@ -712,21 +712,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
/**
- * Initiates a temporal cross transformation that builds all pair
- * combinations of elements of both DataStreams, i.e., it builds a Cartesian
- * product.
- *
- * This method returns a StreamJoinOperator on which the
- * .onWindow(..) should be called to define the
- * window, and then the .where(..) and .equalTo(..) methods can be used to defin
- * the join keys.</p> The user can also use the apply method of the returned JoinedStream
- * to use custom join function.
- *
- */
- def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
- new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
-
- /**
* Writes a DataStream to the standard output stream (stdout). For each
* element of the DataStream the result of .toString is
* written.
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
deleted file mode 100644
index 0060a9f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.functions.CrossFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow
-
-import scala.reflect.ClassTag
-
-class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
- TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
-
- override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
-
- val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
- (l: I1, r: I2) => (l, r))
-
-
- val returnType = createTuple2TypeInformation[I1, I2](input1.getType, input2.getType)
- val javaStream = input1.connect(input2).addGeneralWindowCombine(
- crossWindowFunction,
- returnType, windowSize,
- slideInterval, timeStamp1, timeStamp2)
-
- new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
- }
-}
-object StreamCrossOperator {
-
- private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
- javaStream: JavaStream[(I1, I2)]) extends
- DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] {
-
- /**
- * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf
- * call will be emitted.
- *
- */
- def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
- val cleanCrossWindowFunction = clean(getCrossWindowFunction(op, fun))
-
- op.input1.connect(op.input2).addGeneralWindowCombine(
- cleanCrossWindowFunction,
- implicitly[TypeInformation[R]],
- op.windowSize,
- op.slideInterval,
- op.timeStamp1,
- op.timeStamp2)
- }
-
- override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = {
- every(timeUnit.toMillis(length))
- }
-
- override def every(length: Long): CrossWindow[I1, I2] = {
- val graph = javaStream.getExecutionEnvironment().getStreamGraph()
- val operator = graph.getStreamNode(javaStream.getId()).getOperator()
- operator.asInstanceOf[CoStreamWindow[_,_,_]].setSlideSize(length)
- this
- }
- }
-
- private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
- crossFunction: (I1, I2) => R):
- CrossWindowFunction[I1, I2, R] = {
- require(crossFunction != null, "Join function must not be null.")
-
- val cleanFun = op.input1.clean(crossFunction)
- val crossFun = new CrossFunction[I1, I2, R] {
- override def cross(first: I1, second: I2): R = {
- cleanFun(first, second)
- }
- }
-
- new CrossWindowFunction[I1, I2, R](crossFun)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index e2be44f..f584767 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.operators.Keys
import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.JoinWindowFunction
import org.apache.flink.streaming.util.keys.KeySelectorUtil
import scala.Array.canBuildFrom
@@ -151,10 +150,11 @@ object StreamJoinOperator {
private def createJoinOperator(): JavaStream[(I1, I2)] = {
- val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
- op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
- .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
- returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+// val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
+// op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+// .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
+// returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+ null
}
}
@@ -172,14 +172,15 @@ object StreamJoinOperator {
val cleanFun = clean(getJoinWindowFunction(jp, fun))
- op.input1.groupBy(jp.keys1).connect(op.input2.groupBy(jp.keys2))
- .addGeneralWindowCombine[R](
- cleanFun,
- implicitly[TypeInformation[R]],
- op.windowSize,
- op.slideInterval,
- op.timeStamp1,
- op.timeStamp2)
+// op.input1.groupBy(jp.keys1).connect(op.input2.groupBy(jp.keys2))
+// .addGeneralWindowCombine[R](
+// cleanFun,
+// implicitly[TypeInformation[R]],
+// op.windowSize,
+// op.slideInterval,
+// op.timeStamp1,
+// op.timeStamp2)
+ null
}
}
@@ -195,7 +196,8 @@ object StreamJoinOperator {
}
}
- new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+// new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+ null
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 59843e2..b8a3b94 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
-import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
+import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => JavaConStream }
import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream }
import language.implicitConversions
@@ -48,7 +48,7 @@ package object scala {
new SplitDataStream[R](javaStream)
implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):
- ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+ ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =
StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 8b4d527..606aac5 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -84,7 +84,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
/**
* Tests that {@link DataStream#groupBy} and {@link DataStream#partitionBy(KeySelector)} result in
- * different and correct topologies. Does the some for the {@link ConnectedDataStream}.
+ * different and correct topologies. Does the some for the {@link ConnectedStreams}.
*/
@Test
def testPartitioning(): Unit = {
@@ -144,21 +144,21 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid2)))
assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid3)))
- //Testing ConnectedDataStream grouping
- val connectedGroup1: ConnectedDataStream[_, _] = connected.groupBy(0, 0)
+ //Testing ConnectedStreams grouping
+ val connectedGroup1: ConnectedStreams[_, _] = connected.groupBy(0, 0)
val downStreamId1: Integer = createDownStreamId(connectedGroup1)
- val connectedGroup2: ConnectedDataStream[_, _] = connected.groupBy(Array[Int](0), Array[Int](0))
+ val connectedGroup2: ConnectedStreams[_, _] = connected.groupBy(Array[Int](0), Array[Int](0))
val downStreamId2: Integer = createDownStreamId(connectedGroup2)
- val connectedGroup3: ConnectedDataStream[_, _] = connected.groupBy("_1", "_1")
+ val connectedGroup3: ConnectedStreams[_, _] = connected.groupBy("_1", "_1")
val downStreamId3: Integer = createDownStreamId(connectedGroup3)
- val connectedGroup4: ConnectedDataStream[_, _] =
+ val connectedGroup4: ConnectedStreams[_, _] =
connected.groupBy(Array[String]("_1"), Array[String]("_1"))
val downStreamId4: Integer = createDownStreamId(connectedGroup4)
- val connectedGroup5: ConnectedDataStream[_, _] = connected.groupBy(x => x._1, x => x._1)
+ val connectedGroup5: ConnectedStreams[_, _] = connected.groupBy(x => x._1, x => x._1)
val downStreamId5: Integer = createDownStreamId(connectedGroup5)
assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId1)))
@@ -176,22 +176,22 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId5)))
assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId5)))
- //Testing ConnectedDataStream partitioning
- val connectedPartition1: ConnectedDataStream[_, _] = connected.partitionByHash(0, 0)
+ //Testing ConnectedStreams partitioning
+ val connectedPartition1: ConnectedStreams[_, _] = connected.partitionByHash(0, 0)
val connectDownStreamId1: Integer = createDownStreamId(connectedPartition1)
- val connectedPartition2: ConnectedDataStream[_, _] =
+ val connectedPartition2: ConnectedStreams[_, _] =
connected.partitionByHash(Array[Int](0), Array[Int](0))
val connectDownStreamId2: Integer = createDownStreamId(connectedPartition2)
- val connectedPartition3: ConnectedDataStream[_, _] = connected.partitionByHash("_1", "_1")
+ val connectedPartition3: ConnectedStreams[_, _] = connected.partitionByHash("_1", "_1")
val connectDownStreamId3: Integer = createDownStreamId(connectedPartition3)
- val connectedPartition4: ConnectedDataStream[_, _] =
+ val connectedPartition4: ConnectedStreams[_, _] =
connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
val connectDownStreamId4: Integer = createDownStreamId(connectedPartition4)
- val connectedPartition5: ConnectedDataStream[_, _] =
+ val connectedPartition5: ConnectedStreams[_, _] =
connected.partitionByHash(x => x._1, x => x._1)
val connectDownStreamId5: Integer = createDownStreamId(connectedPartition5)
@@ -492,7 +492,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
// we need to rebalance before iteration
val source = env.fromElements(1, 2, 3).map { t: Int => t }
- val iterated = source.iterate((input: ConnectedDataStream[Int, String]) => {
+ val iterated = source.iterate((input: ConnectedStreams[Int, String]) => {
val head = input.map(i => (i + 1).toString, s => s)
(head.filter(_ == "2"), head.filter(_ != "2"))
}, 1000).print()
@@ -501,7 +501,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
(input.map(_ + 1), input.map(_.toString)), 2000)
try {
- val invalid = source.iterate((input: ConnectedDataStream[Int, String]) => {
+ val invalid = source.iterate((input: ConnectedStreams[Int, String]) => {
val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s)
(head.filter(_ == "2"), head.filter(_ != "2"))
}, 1000).print()
@@ -546,7 +546,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
return dataStream.print.getTransformation.getId
}
- private def createDownStreamId(dataStream: ConnectedDataStream[_, _]): Integer = {
+ private def createDownStreamId(dataStream: ConnectedStreams[_, _]): Integer = {
val m = dataStream.map(x => 0, x => 0)
m.print()
m.getId
http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index f53b986..66fe197 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -43,11 +43,11 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.streaming.api.datastream.DataStream.transform",
"org.apache.flink.streaming.api.datastream.DataStream.getTransformation",
"org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
- "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getExecutionEnvironment",
- "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getType1",
- "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getType2",
- "org.apache.flink.streaming.api.datastream.ConnectedDataStream.addGeneralWindowCombine",
- "org.apache.flink.streaming.api.datastream.ConnectedDataStream.transform",
+ "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
+ "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1",
+ "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
+ "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
+ "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform",
"org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
"org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
@@ -104,9 +104,9 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
classOf[DataStream[_]])
checkMethods(
- "ConnectedDataStream", "ConnectedDataStream",
- classOf[org.apache.flink.streaming.api.datastream.ConnectedDataStream[_,_]],
- classOf[ConnectedDataStream[_,_]])
+ "ConnectedStreams", "ConnectedStreams",
+ classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_,_]],
+ classOf[ConnectedStreams[_,_]])
checkMethods(
"SplitDataStream", "SplitDataStream",
@@ -114,11 +114,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
classOf[SplitDataStream[_]])
checkMethods(
- "StreamCrossOperator", "StreamCrossOperator",
- classOf[org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator[_,_]],
- classOf[StreamCrossOperator[_,_]])
-
- checkMethods(
"StreamJoinOperator", "StreamJoinOperator",
classOf[org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator[_,_]],
classOf[StreamJoinOperator[_,_]])