You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:40 UTC

[06/13] flink git commit: [FLINK-2550] Rename ConnectedDataStream to ConnectedStreams, Remove some operations

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
deleted file mode 100644
index dc6ea34..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamReduce<IN1, IN2, OUT>
-		extends AbstractUdfStreamOperator<OUT, CoReduceFunction<IN1, IN2, OUT>>
-		implements TwoInputStreamOperator<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected IN1 currentValue1 = null;
-	protected IN2 currentValue2 = null;
-
-	// We keep track of watermarks from both inputs, the combined input is the minimum
-	// Once the minimum advances we emit a new watermark for downstream operators
-	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MIN_VALUE;
-	private long input2Watermark = Long.MIN_VALUE;
-
-	public CoStreamReduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-		super(coReducer);
-		currentValue1 = null;
-		currentValue2 = null;
-	}
-
-	@Override
-	public void processElement1(StreamRecord<IN1> element) throws Exception {
-		if (currentValue1 != null) {
-			currentValue1 = userFunction.reduce1(currentValue1, element.getValue());
-		} else {
-			currentValue1 = element.getValue();
-		}
-		output.collect(element.replace(userFunction.map1(currentValue1)));
-	}
-
-	@Override
-	public void processElement2(StreamRecord<IN2> element) throws Exception {
-		if (currentValue2 != null) {
-			currentValue2 = userFunction.reduce2(currentValue2, element.getValue());
-		} else {
-			currentValue2 = element.getValue();
-		}
-		output.collect(element.replace(userFunction.map2(currentValue2)));
-	}
-
-	@Override
-	public void processWatermark1(Watermark mark) throws Exception {
-		input1Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
-	@Override
-	public void processWatermark2(Watermark mark) throws Exception {
-		input2Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
deleted file mode 100644
index 4bfe2ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.math.util.MathUtils;
-import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.state.CircularFifoList;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class CoStreamWindow<IN1, IN2, OUT>
-		extends AbstractUdfStreamOperator<OUT, CoWindowFunction<IN1, IN2, OUT>>
-		implements TwoInputStreamOperator<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected long windowSize;
-	protected long slideSize;
-	protected CircularFifoList<StreamRecord<IN1>> circularList1;
-	protected CircularFifoList<StreamRecord<IN2>> circularList2;
-	protected TimestampWrapper<IN1> timeStamp1;
-	protected TimestampWrapper<IN2> timeStamp2;
-
-	protected StreamWindow window;
-
-	protected long startTime;
-	protected long nextRecordTime;
-
-	// We keep track of watermarks from both inputs, the combined input is the minimum
-	// Once the minimum advances we emit a new watermark for downstream operators
-	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MIN_VALUE;
-	private long input2Watermark = Long.MIN_VALUE;
-
-	public CoStreamWindow(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
-			long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
-		super(coWindowFunction);
-		this.windowSize = windowSize;
-		this.slideSize = slideInterval;
-		this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
-		this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
-		this.timeStamp1 = timeStamp1;
-		this.timeStamp2 = timeStamp2;
-		this.startTime = timeStamp1.getStartTime();
-
-		this.window = new StreamWindow();
-	}
-
-	@Override
-	public void processElement1(StreamRecord<IN1> element) throws Exception {
-		window.addToBuffer1(element.getValue());
-	}
-
-	@Override
-	public void processElement2(StreamRecord<IN2> element) throws Exception {
-		window.addToBuffer2(element.getValue());
-	}
-
-	@SuppressWarnings("unchecked")
-	protected void callUserFunction() throws Exception {
-
-		List<IN1> first = new ArrayList<IN1>();
-		List<IN2> second = new ArrayList<IN2>();
-
-		// TODO: Give operators a way to copy elements
-
-		for (IN1 element : window.circularList1.getElements()) {
-			first.add(element);
-		}
-		for (IN2 element : window.circularList2.getElements()) {
-			second.add(element);
-		}
-
-		TimestampedCollector<OUT> timestampedCollector = new TimestampedCollector<OUT>(output);
-		timestampedCollector.setTimestamp(System.currentTimeMillis());
-		if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
-			userFunction.coWindow(first, second, timestampedCollector);
-		}
-	}
-
-	@Override
-	public void processWatermark1(Watermark mark) throws Exception {
-		input1Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
-	@Override
-	public void processWatermark2(Watermark mark) throws Exception {
-		input2Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
-	protected class StreamWindow implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		protected int granularity;
-		protected int batchPerSlide;
-		protected long numberOfBatches;
-
-		protected long minibatchCounter;
-
-		protected CircularFifoList<IN1> circularList1;
-		protected CircularFifoList<IN2> circularList2;
-
-		public StreamWindow() {
-			this.granularity = (int) MathUtils.gcd(windowSize, slideSize);
-			this.batchPerSlide = (int) (slideSize / granularity);
-			this.numberOfBatches = windowSize / granularity;
-			this.circularList1 = new CircularFifoList<IN1>();
-			this.circularList2 = new CircularFifoList<IN2>();
-			this.minibatchCounter = 0;
-		}
-
-		public void addToBuffer1(IN1 nextValue) throws Exception {
-			checkWindowEnd(timeStamp1.getTimestamp(nextValue));
-			if (minibatchCounter >= 0) {
-				circularList1.add(nextValue);
-			}
-		}
-
-		public void addToBuffer2(IN2 nextValue) throws Exception {
-			checkWindowEnd(timeStamp2.getTimestamp(nextValue));
-			if (minibatchCounter >= 0) {
-				circularList2.add(nextValue);
-			}
-		}
-
-		protected synchronized void checkWindowEnd(long timeStamp) throws Exception{
-			nextRecordTime = timeStamp;
-
-			while (miniBatchEnd()) {
-				circularList1.newSlide();
-				circularList2.newSlide();
-				minibatchCounter++;
-				if (windowEnd()) {
-					callUserFunction();
-					circularList1.shiftWindow(batchPerSlide);
-					circularList2.shiftWindow(batchPerSlide);
-				}
-			}
-		}
-
-		protected boolean miniBatchEnd() {
-			if (nextRecordTime < startTime + granularity) {
-				return false;
-			} else {
-				startTime += granularity;
-				return true;
-			}
-		}
-
-		public boolean windowEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				minibatchCounter -= batchPerSlide;
-				return true;
-			}
-			return false;
-		}
-
-		public void reduceLastBatch() throws Exception{
-			if (!miniBatchEnd()) {
-				callUserFunction();
-			}
-		}
-
-		public Iterable<IN1> getIterable1() {
-			return circularList1.getIterable();
-		}
-
-		public Iterable<IN2> getIterable2() {
-			return circularList2.getIterable();
-		}
-
-		@Override
-		public String toString() {
-			return circularList1.toString();
-		}
-
-	}
-
-	@Override
-	public void close() throws Exception {
-		if (!window.miniBatchEnd()) {
-			try {
-				callUserFunction();
-			} catch (Exception e) {
-				throw new RuntimeException("Could not call user function in CoStreamWindow.close()", e);
-			}
-		}
-		super.close();
-	}
-
-	public void setSlideSize(long slideSize) {
-		this.slideSize = slideSize;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 9775392..337d97b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
+import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -62,7 +62,6 @@ import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.NoOpSink;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -127,7 +126,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
 	/**
 	 * Tests that {@link DataStream#groupBy} and {@link DataStream#partitionByHash} result in
-	 * different and correct topologies. Does the some for the {@link ConnectedDataStream}.
+	 * different and correct topologies. Does the some for the {@link ConnectedStreams}.
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
@@ -136,7 +135,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream src1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
 		DataStream src2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-		ConnectedDataStream connected = src1.connect(src2);
+		ConnectedStreams connected = src1.connect(src2);
 
 		//Testing DataStream grouping
 		DataStream group1 = src1.groupBy(0);
@@ -204,20 +203,20 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertFalse(isGrouped(customPartition3));
 		assertFalse(isGrouped(customPartition4));
 
-		//Testing ConnectedDataStream grouping
-		ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0);
+		//Testing ConnectedStreams grouping
+		ConnectedStreams connectedGroup1 = connected.groupBy(0, 0);
 		Integer downStreamId1 = createDownStreamId(connectedGroup1);
 
-		ConnectedDataStream connectedGroup2 = connected.groupBy(new int[]{0}, new int[]{0});
+		ConnectedStreams connectedGroup2 = connected.groupBy(new int[]{0}, new int[]{0});
 		Integer downStreamId2 = createDownStreamId(connectedGroup2);
 
-		ConnectedDataStream connectedGroup3 = connected.groupBy("f0", "f0");
+		ConnectedStreams connectedGroup3 = connected.groupBy("f0", "f0");
 		Integer downStreamId3 = createDownStreamId(connectedGroup3);
 
-		ConnectedDataStream connectedGroup4 = connected.groupBy(new String[]{"f0"}, new String[]{"f0"});
+		ConnectedStreams connectedGroup4 = connected.groupBy(new String[]{"f0"}, new String[]{"f0"});
 		Integer downStreamId4 = createDownStreamId(connectedGroup4);
 
-		ConnectedDataStream connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
+		ConnectedStreams connectedGroup5 = connected.groupBy(new FirstSelector(), new FirstSelector());
 		Integer downStreamId5 = createDownStreamId(connectedGroup5);
 
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
@@ -241,20 +240,20 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertTrue(isGrouped(connectedGroup4));
 		assertTrue(isGrouped(connectedGroup5));
 
-		//Testing ConnectedDataStream partitioning
-		ConnectedDataStream connectedPartition1 = connected.partitionByHash(0, 0);
+		//Testing ConnectedStreams partitioning
+		ConnectedStreams connectedPartition1 = connected.partitionByHash(0, 0);
 		Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
 
-		ConnectedDataStream connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
+		ConnectedStreams connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
 		Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
 
-		ConnectedDataStream connectedPartition3 = connected.partitionByHash("f0", "f0");
+		ConnectedStreams connectedPartition3 = connected.partitionByHash("f0", "f0");
 		Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
 
-		ConnectedDataStream connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
+		ConnectedStreams connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
 		Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
 
-		ConnectedDataStream connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
+		ConnectedStreams connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
 		Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
 
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
@@ -470,7 +469,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		StreamEdge splitEdge = env.getStreamGraph().getStreamEdge(unionFilter.getId(), sink.getTransformation().getId());
 		assertEquals("a", splitEdge.getSelectedNames().get(0));
 
-		ConnectedDataStream<Integer, Integer> connect = map.connect(flatMap);
+		ConnectedStreams<Integer, Integer> connect = map.connect(flatMap);
 		CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() {
 			@Override
 			public String map1(Integer value) {
@@ -606,7 +605,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		return dataStream instanceof GroupedDataStream;
 	}
 
-	private static Integer createDownStreamId(ConnectedDataStream dataStream) {
+	private static Integer createDownStreamId(ConnectedStreams dataStream) {
 		SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
 			@Override
 			public Object map1(Tuple2<Long, Long> value) {
@@ -622,8 +621,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		return coMap.getId();
 	}
 
-	private static boolean isGrouped(ConnectedDataStream dataStream) {
-		return (dataStream.getFirst() instanceof GroupedDataStream && dataStream.getSecond() instanceof GroupedDataStream);
+	private static boolean isGrouped(ConnectedStreams dataStream) {
+		return (dataStream.getFirstInput() instanceof GroupedDataStream && dataStream.getSecondInput() instanceof GroupedDataStream);
 	}
 
 	private static boolean isPartitioned(StreamEdge edge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 285ee57..774f58d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -31,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStreams;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.util.EvenOddOutputSelector;
 import org.apache.flink.streaming.util.NoOpIntMap;
 import org.apache.flink.streaming.util.ReceiveCheckNoOpSink;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
@@ -112,7 +112,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		// introduce dummy mapper to get to correct parallelism
 		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
 
-		ConnectedIterativeDataStream<Integer, Integer> coIter = source.iterate().withFeedbackType(
+		ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
 				Integer.class);
 
 
@@ -151,7 +151,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
 
 		IterativeDataStream<Integer> iter1 = source.iterate();
-		ConnectedIterativeDataStream<Integer, Integer> coIter = source.iterate().withFeedbackType(
+		ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
 				Integer.class);
 
 
@@ -181,7 +181,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 
 		IterativeDataStream<Integer> iter1 = source.iterate();
 		// Calling withFeedbackType should create a new iteration
-		ConnectedIterativeDataStream<Integer, String> iter2 = iter1.withFeedbackType(String.class);
+		ConnectedIterativeDataStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
 
 		iter1.closeWith(iter1.map(NoOpIntMap)).print();
 		iter2.closeWith(iter2.map(NoOpCoMap)).print();
@@ -395,7 +395,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 				.map(NoOpStrMap).name("ParallelizeMap");
 
 
-		ConnectedIterativeDataStream<Integer, String> coIt = env.fromElements(0, 0)
+		ConnectedIterativeDataStreams<Integer, String> coIt = env.fromElements(0, 0)
 				.map(NoOpIntMap).name("ParallelizeMap")
 				.iterate(2000)
 				.withFeedbackType("String");
@@ -403,7 +403,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		try {
 			coIt.groupBy(1, 2);
 			fail();
-		} catch (UnsupportedOperationException e) {
+		} catch (InvalidProgramException e) {
 			// this is expected
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 0989128..4c0f59f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -30,8 +29,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
@@ -72,12 +69,6 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase {
 			fail();
 		} catch (Exception e) {
 		}
-		try {
-			source.connect(source).windowReduce(new TestCoWindow<Long, Long, String>(), 10, 100)
-					.print();
-			fail();
-		} catch (Exception e) {
-		}
 
 		env.addSource(new TestSource<Integer>()).returns("Integer");
 		source.map(new TestMap<Long, Long>()).returns(Long.class).print();
@@ -86,9 +77,6 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase {
 		source.connect(source).flatMap(new TestCoFlatMap<Long, Long, Integer>())
 				.returns(BasicTypeInfo.INT_TYPE_INFO).print();
 		
-		source.connect(source).windowReduce(new TestCoWindow<Long, Long, String>(), 10, 100)
-				.returns("String").print();
-
 		assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
 				source.map(new TestMap<Long, Long>()).returns(Long.class).getType());
 
@@ -161,38 +149,4 @@ public class TypeFillTest extends StreamingMultipleProgramsTestBase {
 		}
 
 	}
-
-	private class TestCoReduce<IN1, IN2, OUT> implements CoReduceFunction<IN1, IN2, OUT> {
-
-		@Override
-		public IN1 reduce1(IN1 value1, IN1 value2) {
-			return null;
-		}
-
-		@Override
-		public IN2 reduce2(IN2 value1, IN2 value2) {
-			return null;
-		}
-
-		@Override
-		public OUT map1(IN1 value) {
-			return null;
-		}
-
-		@Override
-		public OUT map2(IN2 value) {
-			return null;
-		}
-
-	}
-
-	private class TestCoWindow<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
-
-		@Override
-		public void coWindow(List<IN1> first, List<IN2> second, Collector<OUT> out)
-				throws Exception {
-		}
-
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index 508f1a2..0137682 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -115,23 +115,6 @@ public class WindowCrossJoinTest extends StreamingMultipleProgramsTestBase {
 				.map(new ResultMap())
 				.addSink(joinResultSink);
 
-		inStream1
-				.cross(inStream2)
-				.onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(),
-						new MyTimestamp<Tuple1<Integer>>(), 100)
-				.with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> cross(
-							Tuple2<Integer, String> val1, Tuple1<Integer> val2) throws Exception {
-						return new Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>(val1, val2);
-					}
-				})
-				.map(new ResultMap())
-				.addSink(crossResultSink);
-
 		env.execute();
 
 		assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinExpectedResults),

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 3b05274..2246ffd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
+import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -220,7 +220,7 @@ public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase
 		DataStream<Integer> source1 = env.fromElements(1, 10);
 		DataStream<Integer> source2 = env.fromElements(2, 11);
 
-		ConnectedDataStream<Integer, Integer> connectedSource = source1.connect(source2);
+		ConnectedStreams<Integer, Integer> connectedSource = source1.connect(source2);
 
 		OutputTypeConfigurableOperationWithTwoInputs outputTypeConfigurableOperation = new OutputTypeConfigurableOperationWithTwoInputs();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
deleted file mode 100644
index a1e9f74..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala.ClosureCleaner
-import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction}
-import org.apache.flink.util.Collector
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
-
-  /**
-   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
-   * the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input and
-   * @param fun2 for each element of the second input. Each
-   * CoMapFunction call returns exactly one element.
-   *
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
-  DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-    val comapper = new CoMapFunction[IN1, IN2, R] {
-      def map1(in1: IN1): R = cleanFun1(in1)
-      def map2(in2: IN2): R = cleanFun2(in2)
-    }
-
-    map(comapper)
-  }
-
-  /**
-   * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
-   * the output to a common type. The transformation calls a
-   * {@link CoMapFunction#map1} for each element of the first input and
-   * {@link CoMapFunction#map2} for each element of the second input. Each
-   * CoMapFunction call returns exactly one element. The user can also extend
-   * {@link RichCoMapFunction} to gain access to other features provided by
-   * the {@link RichFuntion} interface.
-   *
-   * @param coMapper
-   * The CoMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coMapper == null) {
-      throw new NullPointerException("Map function must not be null.")
-    }
-
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
-   * maps the output to a common type. The transformation calls a
-   * {@link CoFlatMapFunction#flatMap1} for each element of the first input
-   * and {@link CoFlatMapFunction#flatMap2} for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none. The user can also extend {@link RichFlatMapFunction} to
-   * gain access to other features provided by the {@link RichFuntion}
-   * interface.
-   *
-   * @param coFlatMapper
-   * The CoFlatMapFunction used to jointly transform the two input
-   * DataStreams
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coFlatMapper == null) {
-      throw new NullPointerException("FlatMap function must not be null.")
-    }
-    
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
-   * maps the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input
-   * and @param fun2 for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none.
-   *
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
-      fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("FlatMap functions must not be null.")
-    }
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
-      def flatMap1(value: IN1, out: Collector[R]): Unit = cleanFun1(value, out)
-      def flatMap2(value: IN2, out: Collector[R]): Unit = cleanFun2(value, out)
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
-   * maps the output to a common type. The transformation calls a
-   * @param fun1 for each element of the first input
-   * and @param fun2 for each element of the second
-   * input. Each CoFlatMapFunction call returns any number of elements
-   * including none.
-   *
-   * @return The transformed { @link DataStream}
-   */
-  def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
-      fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
-    if (fun1 == null || fun2 == null) {
-      throw new NullPointerException("FlatMap functions must not be null.")
-    }
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
-      def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
-      def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
-    }
-    flatMap(flatMapper)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPosition1 and keyPosition2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param keyPosition1
-   * The field used to compute the hashcode of the elements in the
-   * first input stream.
-   * @param keyPosition2
-   * The field used to compute the hashcode of the elements in the
-   * second input stream.
-   * @return @return The transformed { @link ConnectedDataStream}
-   */
-  def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(keyPosition1, keyPosition2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 according to keyPositions1 and keyPositions2. Used for
-   * applying function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param keyPositions1
-   * The fields used to group the first input stream.
-   * @param keyPositions2
-   * The fields used to group the second input stream.
-   * @return @return The transformed { @link ConnectedDataStream}
-   */
-  def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): 
-  ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(keyPositions1, keyPositions2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to field1 and field2. A field
-   * expression is either the name of a public field or a getter method with
-   * parentheses of the {@link DataStream}S underlying type. A dot can be used
-   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-   *
-   * @param field1
-   * The grouping expression for the first input
-   * @param field2
-   * The grouping expression for the second input
-   * @return The grouped { @link ConnectedDataStream}
-   */
-  def groupBy(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(field1, field2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream using key expressions. Groups
-   * the elements of input1 and input2 according to fields1 and fields2. A
-   * field expression is either the name of a public field or a getter method
-   * with parentheses of the {@link DataStream}S underlying type. A dot can be
-   * used to drill down into objects, as in {@code "field1.getInnerField2()" }
-   * .
-   *
-   * @param fields1
-   * The grouping expressions for the first input
-   * @param fields2
-   * The grouping expressions for the second input
-   * @return The grouped { @link ConnectedDataStream}
-   */
-  def groupBy(fields1: Array[String], fields2: Array[String]): 
-  ConnectedDataStream[IN1, IN2] = {
-    javaStream.groupBy(fields1, fields2)
-  }
-
-  /**
-   * GroupBy operation for connected data stream. Groups the elements of
-   * input1 and input2 using fun1 and fun2. Used for applying
-   * function on grouped data streams for example
-   * {@link ConnectedDataStream#reduce}
-   *
-   * @param fun1
-   * The function used for grouping the first input
-   * @param fun2
-   * The function used for grouping the second input
-   * @return The grouped { @link ConnectedDataStream}
-   */
-  def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
-  ConnectedDataStream[IN1, IN2] = {
-
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-    val keyExtractor1 = new KeySelector[IN1, K] {
-      def getKey(in: IN1) = cleanFun1(in)
-    }
-    val keyExtractor2 = new KeySelector[IN2, L] {
-      def getKey(in: IN2) = cleanFun2(in)
-    }
-
-    javaStream.groupBy(keyExtractor1, keyExtractor2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 according to keyPosition1 and keyPosition2.
-   *
-   * @param keyPosition1
-   * The field used to compute the hashcode of the elements in the
-   * first input stream.
-   * @param keyPosition2
-   * The field used to compute the hashcode of the elements in the
-   * second input stream.
-   * @return The transformed { @link ConnectedDataStream}
-   */
-  def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionByHash(keyPosition1, keyPosition2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 according to keyPositions1 and keyPositions2.
-   *
-   * @param keyPositions1
-   * The fields used to partition the first input stream.
-   * @param keyPositions2
-   * The fields used to partition the second input stream.
-   * @return The transformed { @link ConnectedDataStream}
-   */
-  def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
-  ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionByHash(keyPositions1, keyPositions2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream using key expressions. Partitions
-   * the elements of input1 and input2 according to field1 and field2. A field
-   * expression is either the name of a public field or a getter method with
-   * parentheses of the {@link DataStream}S underlying type. A dot can be used
-   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-   *
-   * @param field1
-   * The partitioning expression for the first input
-   * @param field2
-   * The partitioning expression for the second input
-   * @return The grouped { @link ConnectedDataStream}
-   */
-  def partitionByHash(field1: String, field2: String): ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionByHash(field1, field2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream using key expressions. Partitions
-   * the elements of input1 and input2 according to fields1 and fields2.
-   *
-   * @param fields1
-   * The partitioning expressions for the first input
-   * @param fields2
-   * The partitioning expressions for the second input
-   * @return The partitioned { @link ConnectedDataStream}
-   */
-  def partitionByHash(fields1: Array[String], fields2: Array[String]):
-  ConnectedDataStream[IN1, IN2] = {
-    javaStream.partitionByHash(fields1, fields2)
-  }
-
-  /**
-   * PartitionBy operation for connected data stream. Partitions the elements of
-   * input1 and input2 using fun1 and fun2.
-   *
-   * @param fun1
-   * The function used for partitioning the first input
-   * @param fun2
-   * The function used for partitioning the second input
-   * @return The partitioned { @link ConnectedDataStream}
-   */
-  def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
-  ConnectedDataStream[IN1, IN2] = {
-
-    val cleanFun1 = clean(fun1)
-    val cleanFun2 = clean(fun2)
-
-    val keyExtractor1 = new KeySelector[IN1, K] {
-      def getKey(in: IN1) = cleanFun1(in)
-    }
-    val keyExtractor2 = new KeySelector[IN2, L] {
-      def getKey(in: IN2) = cleanFun2(in)
-    }
-
-    javaStream.partitionByHash(keyExtractor1, keyExtractor2)
-  }
-
-  /**
-   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
-   * the outputs to a common type. If the {@link ConnectedDataStream} is
-   * batched or windowed then the reduce transformation is applied on every
-   * sliding batch/window of the data stream. If the connected data stream is
-   * grouped then the reducer is applied on every group of elements sharing
-   * the same key. This type of reduce is much faster than reduceGroup since
-   * the reduce function can be applied incrementally.
-   *
-   * @param coReducer
-   * The { @link CoReduceFunction} that will be called for every
-   *             element of the inputs.
-   * @return The transformed { @link DataStream}.
-   */
-  def reduce[R: TypeInformation: ClassTag](coReducer: CoReduceFunction[IN1, IN2, R]): 
-  DataStream[R] = {
-    if (coReducer == null) {
-      throw new NullPointerException("Reduce function must not be null.")
-    }
-    
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    javaStream.reduce(coReducer).returns(outType).asInstanceOf[JavaStream[R]]
-  }
-
-  /**
-   * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
-   * the outputs to a common type. If the {@link ConnectedDataStream} is
-   * batched or windowed then the reduce transformation is applied on every
-   * sliding batch/window of the data stream. If the connected data stream is
-   * grouped then the reducer is applied on every group of elements sharing
-   * the same key. This type of reduce is much faster than reduceGroup since
-   * the reduce function can be applied incrementally.
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def reduce[R: TypeInformation: ClassTag](reducer1: (IN1, IN1) => IN1,
-      reducer2: (IN2, IN2) => IN2,mapper1: IN1 => R, mapper2: IN2 => R): DataStream[R] = {
-    if (mapper1 == null || mapper2 == null) {
-      throw new NullPointerException("Map functions must not be null.")
-    }
-    if (reducer1 == null || reducer2 == null) {
-      throw new NullPointerException("Reduce functions must not be null.")
-    }
-
-    val cleanReducer1 = clean(reducer1)
-    val cleanReducer2 = clean(reducer2)
-    val cleanMapper1 = clean(mapper1)
-    val cleanMapper2 = clean(mapper2)
-
-    val reducer = new CoReduceFunction[IN1, IN2, R] {
-      def reduce1(value1: IN1, value2: IN1): IN1 = cleanReducer1(value1, value2)
-      def reduce2(value1: IN2, value2: IN2): IN2 = cleanReducer2(value1, value2)
-      def map1(value: IN1): R = cleanMapper1(value)
-      def map2(value: IN2): R = cleanMapper2(value)
-    }
-    reduce(reducer)
-  }
-
-  /**
-   * Applies a CoWindow transformation on the connected DataStreams. The
-   * transformation calls the {@link CoWindowFunction#coWindow} method for for
-   * time aligned windows of the two data streams. System time is used as
-   * default to compute windows.
-   *
-   * @param coWindowFunction
-   * The { @link CoWindowFunction} that will be applied for the time
-   *             windows.
-   * @param windowSize
-   * Size of the windows that will be aligned for both streams in
-   * milliseconds.
-   * @param slideInterval
-   * After every function call the windows will be slid by this
-   * interval.
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: 
-      CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long):
-      DataStream[R] = {
-    if (coWindowFunction == null) {
-      throw new NullPointerException("CoWindow function must no be null")
-    }
-    
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    
-    javaStream.windowReduce(coWindowFunction, windowSize, slideInterval).
-    returns(outType).asInstanceOf[JavaStream[R]]
-  }
-
-  /**
-   * Applies a CoWindow transformation on the connected DataStreams. The
-   * transformation calls the {@link CoWindowFunction#coWindow} method for for
-   * time aligned windows of the two data streams. System time is used as
-   * default to compute windows.
-   *
-   * @param coWindower
-   * The coWindowing function to be applied for the time windows.
-   * @param windowSize
-   * Size of the windows that will be aligned for both streams in
-   * milliseconds.
-   * @param slideInterval
-   * After every function call the windows will be slid by this
-   * interval.
-   *
-   * @return The transformed { @link DataStream}.
-   */
-  def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], 
-      Collector[R]) => Unit, windowSize: Long, slideInterval: Long):
-      DataStream[R] = {
-    if (coWindower == null) {
-      throw new NullPointerException("CoWindow function must no be null")
-    }
-
-    val cleanCoWindower = clean(coWindower)
-
-    val coWindowFun = new CoWindowFunction[IN1, IN2, R] {
-      def coWindow(first: util.List[IN1], second: util.List[IN2], 
-          out: Collector[R]): Unit = cleanCoWindower(first.asScala, second.asScala, out)
-    }
-
-    windowReduce(coWindowFun, windowSize, slideInterval)
-  }
-
-  /**
-   * Returns the first {@link DataStream}.
-   *
-   * @return The first DataStream.
-   */
-  def getFirst(): DataStream[IN1] = {
-    javaStream.getFirst
-  }
-
-  /**
-   * Returns the second {@link DataStream}.
-   *
-   * @return The second DataStream.
-   */
-  def getSecond(): DataStream[IN2] = {
-    javaStream.getSecond
-  }
-
-  /**
-   * Gets the type of the first input
-   *
-   * @return The type of the first input
-   */
-  def getInputType1(): TypeInformation[IN1] = {
-    javaStream.getType1
-  }
-
-  /**
-   * Gets the type of the second input
-   *
-   * @return The type of the second input
-   */
-  def getInputType2(): TypeInformation[IN2] = {
-    javaStream.getType2
-  }
-
-  /**
-   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
-   */
-  private[flink] def clean[F <: AnyRef](f: F): F = {
-    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
new file mode 100644
index 0000000..41c1a7a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
+import org.apache.flink.util.Collector
+import scala.reflect.ClassTag
+
+/**
+ * [[ConnectedStreams]] represents two connected streams of (possible) different data types. It
+ * can be used to apply transformations such as [[CoMapFunction]] on two
+ * [[DataStream]]s.
+ */
+class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
+
+  /**
+   * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+   * the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input and
+   * @param fun2 for each element of the second input. Each
+   * CoMapFunction call returns exactly one element.
+   *
+   * The CoMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): 
+  DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
+    val comapper = new CoMapFunction[IN1, IN2, R] {
+      def map1(in1: IN1): R = cleanFun1(in1)
+      def map2(in2: IN2): R = cleanFun2(in2)
+    }
+
+    map(comapper)
+  }
+
+  /**
+   * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+   * the output to a common type. The transformation calls a
+   * {@link CoMapFunction#map1} for each element of the first input and
+   * {@link CoMapFunction#map2} for each element of the second input. Each
+   * CoMapFunction call returns exactly one element. The user can also extend
+   * {@link RichCoMapFunction} to gain access to other features provided by
+   * the {@link RichFuntion} interface.
+   *
+   * @param coMapper
+   * The CoMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): 
+  DataStream[R] = {
+    if (coMapper == null) {
+      throw new NullPointerException("Map function must not be null.")
+    }
+
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
+    javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]
+  }
+
+  /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+   * maps the output to a common type. The transformation calls a
+   * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+   * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none. The user can also extend {@link RichFlatMapFunction} to
+   * gain access to other features provided by the {@link RichFuntion}
+   * interface.
+   *
+   * @param coFlatMapper
+   * The CoFlatMapFunction used to jointly transform the two input
+   * DataStreams
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
+  DataStream[R] = {
+    if (coFlatMapper == null) {
+      throw new NullPointerException("FlatMap function must not be null.")
+    }
+    
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
+    javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]
+  }
+
+  /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+   * maps the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input
+   * and @param fun2 for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none.
+   *
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, 
+      fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("FlatMap functions must not be null.")
+    }
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
+    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+      def flatMap1(value: IN1, out: Collector[R]): Unit = cleanFun1(value, out)
+      def flatMap2(value: IN2, out: Collector[R]): Unit = cleanFun2(value, out)
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+   * maps the output to a common type. The transformation calls a
+   * @param fun1 for each element of the first input
+   * and @param fun2 for each element of the second
+   * input. Each CoFlatMapFunction call returns any number of elements
+   * including none.
+   *
+   * @return The transformed { @link DataStream}
+   */
+  def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R],
+      fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
+    if (fun1 == null || fun2 == null) {
+      throw new NullPointerException("FlatMap functions must not be null.")
+    }
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
+    val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
+      def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect }
+      def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect }
+    }
+    flatMap(flatMapper)
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 according to keyPosition1 and keyPosition2. Used for
+   * applying function on grouped data streams for example
+   * {@link ConnectedStreams#reduce}
+   *
+   * @param keyPosition1
+   * The field used to compute the hashcode of the elements in the
+   * first input stream.
+   * @param keyPosition2
+   * The field used to compute the hashcode of the elements in the
+   * second input stream.
+   * @return @return The transformed { @link ConnectedStreams}
+   */
+  def groupBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
+    javaStream.groupBy(keyPosition1, keyPosition2)
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 according to keyPositions1 and keyPositions2. Used for
+   * applying function on grouped data streams for example
+   * {@link ConnectedStreams#reduce}
+   *
+   * @param keyPositions1
+   * The fields used to group the first input stream.
+   * @param keyPositions2
+   * The fields used to group the second input stream.
+   * @return @return The transformed { @link ConnectedStreams}
+   */
+  def groupBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): 
+  ConnectedStreams[IN1, IN2] = {
+    javaStream.groupBy(keyPositions1, keyPositions2)
+  }
+
+  /**
+   * GroupBy operation for connected data stream using key expressions. Groups
+   * the elements of input1 and input2 according to field1 and field2. A field
+   * expression is either the name of a public field or a getter method with
+   * parentheses of the {@link DataStream}S underlying type. A dot can be used
+   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+   *
+   * @param field1
+   * The grouping expression for the first input
+   * @param field2
+   * The grouping expression for the second input
+   * @return The grouped { @link ConnectedStreams}
+   */
+  def groupBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
+    javaStream.groupBy(field1, field2)
+  }
+
+  /**
+   * GroupBy operation for connected data stream using key expressions. Groups
+   * the elements of input1 and input2 according to fields1 and fields2. A
+   * field expression is either the name of a public field or a getter method
+   * with parentheses of the {@link DataStream}S underlying type. A dot can be
+   * used to drill down into objects, as in {@code "field1.getInnerField2()" }
+   * .
+   *
+   * @param fields1
+   * The grouping expressions for the first input
+   * @param fields2
+   * The grouping expressions for the second input
+   * @return The grouped { @link ConnectedStreams}
+   */
+  def groupBy(fields1: Array[String], fields2: Array[String]): 
+  ConnectedStreams[IN1, IN2] = {
+    javaStream.groupBy(fields1, fields2)
+  }
+
+  /**
+   * GroupBy operation for connected data stream. Groups the elements of
+   * input1 and input2 using fun1 and fun2. Used for applying
+   * function on grouped data streams for example
+   * {@link ConnectedStreams#reduce}
+   *
+   * @param fun1
+   * The function used for grouping the first input
+   * @param fun2
+   * The function used for grouping the second input
+   * @return The grouped { @link ConnectedStreams}
+   */
+  def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+  ConnectedStreams[IN1, IN2] = {
+
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
+    val keyExtractor1 = new KeySelector[IN1, K] {
+      def getKey(in: IN1) = cleanFun1(in)
+    }
+    val keyExtractor2 = new KeySelector[IN2, L] {
+      def getKey(in: IN2) = cleanFun2(in)
+    }
+
+    javaStream.groupBy(keyExtractor1, keyExtractor2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 according to keyPosition1 and keyPosition2.
+   *
+   * @param keyPosition1
+   * The field used to compute the hashcode of the elements in the
+   * first input stream.
+   * @param keyPosition2
+   * The field used to compute the hashcode of the elements in the
+   * second input stream.
+   * @return The transformed { @link ConnectedStreams}
+   */
+  def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
+    javaStream.partitionByHash(keyPosition1, keyPosition2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 according to keyPositions1 and keyPositions2.
+   *
+   * @param keyPositions1
+   * The fields used to partition the first input stream.
+   * @param keyPositions2
+   * The fields used to partition the second input stream.
+   * @return The transformed { @link ConnectedStreams}
+   */
+  def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
+  ConnectedStreams[IN1, IN2] = {
+    javaStream.partitionByHash(keyPositions1, keyPositions2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream using key expressions. Partitions
+   * the elements of input1 and input2 according to field1 and field2. A field
+   * expression is either the name of a public field or a getter method with
+   * parentheses of the {@link DataStream}S underlying type. A dot can be used
+   * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+   *
+   * @param field1
+   * The partitioning expression for the first input
+   * @param field2
+   * The partitioning expression for the second input
+   * @return The grouped { @link ConnectedStreams}
+   */
+  def partitionByHash(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
+    javaStream.partitionByHash(field1, field2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream using key expressions. Partitions
+   * the elements of input1 and input2 according to fields1 and fields2.
+   *
+   * @param fields1
+   * The partitioning expressions for the first input
+   * @param fields2
+   * The partitioning expressions for the second input
+   * @return The partitioned { @link ConnectedStreams}
+   */
+  def partitionByHash(fields1: Array[String], fields2: Array[String]):
+  ConnectedStreams[IN1, IN2] = {
+    javaStream.partitionByHash(fields1, fields2)
+  }
+
+  /**
+   * PartitionBy operation for connected data stream. Partitions the elements of
+   * input1 and input2 using fun1 and fun2.
+   *
+   * @param fun1
+   * The function used for partitioning the first input
+   * @param fun2
+   * The function used for partitioning the second input
+   * @return The partitioned { @link ConnectedStreams}
+   */
+  def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+  ConnectedStreams[IN1, IN2] = {
+
+    val cleanFun1 = clean(fun1)
+    val cleanFun2 = clean(fun2)
+
+    val keyExtractor1 = new KeySelector[IN1, K] {
+      def getKey(in: IN1) = cleanFun1(in)
+    }
+    val keyExtractor2 = new KeySelector[IN2, L] {
+      def getKey(in: IN2) = cleanFun2(in)
+    }
+
+    javaStream.partitionByHash(keyExtractor1, keyExtractor2)
+  }
+
+  /**
+   * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
+   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   */
+  private[flink] def clean[F <: AnyRef](f: F): F = {
+    new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index ca5fc48..c9aee61 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -203,11 +203,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream.union(dataStreams.map(_.getJavaStream): _*)
 
   /**
-   * Creates a new ConnectedDataStream by connecting
+   * Creates a new ConnectedStreams by connecting
    * DataStream outputs of different type with each other. The
    * DataStreams connected using this operators can be used with CoFunctions.
    */
-  def connect[T2](dataStream: DataStream[T2]): ConnectedDataStream[T, T2] = 
+  def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
     javaStream.connect(dataStream.getJavaStream)
 
 
@@ -408,7 +408,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * stream of the iterative part.
    * 
    * The input stream of the iterate operator and the feedback stream will be treated
-   * as a ConnectedDataStream where the the input is connected with the feedback stream.
+   * as a ConnectedStreams where the the input is connected with the feedback stream.
    * 
    * This allows the user to distinguish standard input from feedback inputs.
    * 
@@ -420,7 +420,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * to 0 then the iteration sources will indefinitely, so the job must be killed to stop.
    *
    */
-  def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedDataStream[T, F] => 
+  def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] =>
     (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
     val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
     val connectedIterativeStream = javaStream.iterate(maxWaitTimeMillis).
@@ -712,21 +712,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
     new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
 
   /**
-   * Initiates a temporal cross transformation that builds all pair
-   * combinations of elements of both DataStreams, i.e., it builds a Cartesian
-   * product.
-   *
-   * This method returns a StreamJoinOperator on which the
-   * .onWindow(..) should be called to define the
-   * window, and then the .where(..) and .equalTo(..) methods can be used to defin
-   * the join keys.</p> The user can also use the apply method of the returned JoinedStream
-   * to use custom join function.
-   *
-   */
-  def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
-    new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
-
-  /**
    * Writes a DataStream to the standard output stream (stdout). For each
    * element of the DataStream the result of .toString is
    * written.

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
deleted file mode 100644
index 0060a9f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.scala
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.functions.CrossFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.CrossWindowFunction
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow
-
-import scala.reflect.ClassTag
-
-class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
-  TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
-
-  override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
-
-    val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
-      (l: I1, r: I2) => (l, r))
-
-
-    val returnType = createTuple2TypeInformation[I1, I2](input1.getType, input2.getType)
-    val javaStream = input1.connect(input2).addGeneralWindowCombine(
-      crossWindowFunction,
-      returnType, windowSize,
-      slideInterval, timeStamp1, timeStamp2)
-
-    new StreamCrossOperator.CrossWindow[I1, I2](this, javaStream)
-  }
-}
-object StreamCrossOperator {
-
-  private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
-                                           javaStream: JavaStream[(I1, I2)]) extends
-    DataStream[(I1, I2)](javaStream) with TemporalWindow[CrossWindow[I1, I2]] {
-
-    /**
-     * Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf
-     * call will be emitted.
-     *
-     */
-    def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
-
-      val cleanCrossWindowFunction = clean(getCrossWindowFunction(op, fun))
-
-      op.input1.connect(op.input2).addGeneralWindowCombine(
-        cleanCrossWindowFunction,
-        implicitly[TypeInformation[R]],
-        op.windowSize,
-        op.slideInterval,
-        op.timeStamp1,
-        op.timeStamp2)
-    }
-    
-    override def every(length: Long, timeUnit: TimeUnit): CrossWindow[I1, I2] = {
-      every(timeUnit.toMillis(length))
-    }
-
-    override def every(length: Long): CrossWindow[I1, I2] = {
-      val graph = javaStream.getExecutionEnvironment().getStreamGraph()
-      val operator = graph.getStreamNode(javaStream.getId()).getOperator()
-      operator.asInstanceOf[CoStreamWindow[_,_,_]].setSlideSize(length)
-      this
-    }
-  }
-
-  private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
-                                                       crossFunction: (I1, I2) => R):
-  CrossWindowFunction[I1, I2, R] = {
-    require(crossFunction != null, "Join function must not be null.")
-
-    val cleanFun = op.input1.clean(crossFunction)
-    val crossFun = new CrossFunction[I1, I2, R] {
-      override def cross(first: I1, second: I2): R = {
-        cleanFun(first, second)
-      }
-    }
-
-    new CrossWindowFunction[I1, I2, R](crossFun)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index e2be44f..f584767 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.operators.Keys
 import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.JoinWindowFunction
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 
 import scala.Array.canBuildFrom
@@ -151,10 +150,11 @@ object StreamJoinOperator {
 
     private def createJoinOperator(): JavaStream[(I1, I2)] = {
 
-      val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
-      op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
-        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
-          returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+//      val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType)
+//      op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+//        .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
+//          returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+      null
     }
   }
 
@@ -172,14 +172,15 @@ object StreamJoinOperator {
 
       val cleanFun = clean(getJoinWindowFunction(jp, fun))
 
-      op.input1.groupBy(jp.keys1).connect(op.input2.groupBy(jp.keys2))
-        .addGeneralWindowCombine[R](
-          cleanFun,
-          implicitly[TypeInformation[R]],
-          op.windowSize,
-          op.slideInterval,
-          op.timeStamp1,
-          op.timeStamp2)
+//      op.input1.groupBy(jp.keys1).connect(op.input2.groupBy(jp.keys2))
+//        .addGeneralWindowCombine[R](
+//          cleanFun,
+//          implicitly[TypeInformation[R]],
+//          op.windowSize,
+//          op.slideInterval,
+//          op.timeStamp1,
+//          op.timeStamp2)
+      null
     }
   }
 
@@ -195,7 +196,8 @@ object StreamJoinOperator {
       }
     }
 
-    new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+//    new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+    null
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 59843e2..b8a3b94 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWStream }
 import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream }
-import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream }
+import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => JavaConStream }
 import org.apache.flink.streaming.api.datastream.{ GroupedDataStream => GroupedJavaStream }
 import language.implicitConversions
 
@@ -48,7 +48,7 @@ package object scala {
     new SplitDataStream[R](javaStream)
 
   implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: JavaConStream[IN1, IN2]):
-  ConnectedDataStream[IN1, IN2] = new ConnectedDataStream[IN1, IN2](javaStream)
+  ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
 
   implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =
     StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 8b4d527..606aac5 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -84,7 +84,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
   /**
    * Tests that {@link DataStream#groupBy} and {@link DataStream#partitionBy(KeySelector)} result in
-   * different and correct topologies. Does the some for the {@link ConnectedDataStream}.
+   * different and correct topologies. Does the some for the {@link ConnectedStreams}.
    */
   @Test
   def testPartitioning(): Unit = {
@@ -144,21 +144,21 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid2)))
     assert(isCustomPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, cpid3)))
 
-    //Testing ConnectedDataStream grouping
-    val connectedGroup1: ConnectedDataStream[_, _] = connected.groupBy(0, 0)
+    //Testing ConnectedStreams grouping
+    val connectedGroup1: ConnectedStreams[_, _] = connected.groupBy(0, 0)
     val downStreamId1: Integer = createDownStreamId(connectedGroup1)
 
-    val connectedGroup2: ConnectedDataStream[_, _] = connected.groupBy(Array[Int](0), Array[Int](0))
+    val connectedGroup2: ConnectedStreams[_, _] = connected.groupBy(Array[Int](0), Array[Int](0))
     val downStreamId2: Integer = createDownStreamId(connectedGroup2)
 
-    val connectedGroup3: ConnectedDataStream[_, _] = connected.groupBy("_1", "_1")
+    val connectedGroup3: ConnectedStreams[_, _] = connected.groupBy("_1", "_1")
     val downStreamId3: Integer = createDownStreamId(connectedGroup3)
 
-    val connectedGroup4: ConnectedDataStream[_, _] =
+    val connectedGroup4: ConnectedStreams[_, _] =
       connected.groupBy(Array[String]("_1"), Array[String]("_1"))
     val downStreamId4: Integer = createDownStreamId(connectedGroup4)
 
-    val connectedGroup5: ConnectedDataStream[_, _] = connected.groupBy(x => x._1, x => x._1)
+    val connectedGroup5: ConnectedStreams[_, _] = connected.groupBy(x => x._1, x => x._1)
     val downStreamId5: Integer = createDownStreamId(connectedGroup5)
 
     assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId1)))
@@ -176,22 +176,22 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     assert(isPartitioned(env.getStreamGraph.getStreamEdge(src1.getId, downStreamId5)))
     assert(isPartitioned(env.getStreamGraph.getStreamEdge(src2.getId, downStreamId5)))
 
-    //Testing ConnectedDataStream partitioning
-    val connectedPartition1: ConnectedDataStream[_, _] = connected.partitionByHash(0, 0)
+    //Testing ConnectedStreams partitioning
+    val connectedPartition1: ConnectedStreams[_, _] = connected.partitionByHash(0, 0)
     val connectDownStreamId1: Integer = createDownStreamId(connectedPartition1)
 
-    val connectedPartition2: ConnectedDataStream[_, _] =
+    val connectedPartition2: ConnectedStreams[_, _] =
       connected.partitionByHash(Array[Int](0), Array[Int](0))
     val connectDownStreamId2: Integer = createDownStreamId(connectedPartition2)
 
-    val connectedPartition3: ConnectedDataStream[_, _] = connected.partitionByHash("_1", "_1")
+    val connectedPartition3: ConnectedStreams[_, _] = connected.partitionByHash("_1", "_1")
     val connectDownStreamId3: Integer = createDownStreamId(connectedPartition3)
 
-    val connectedPartition4: ConnectedDataStream[_, _] =
+    val connectedPartition4: ConnectedStreams[_, _] =
       connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
     val connectDownStreamId4: Integer = createDownStreamId(connectedPartition4)
 
-    val connectedPartition5: ConnectedDataStream[_, _] =
+    val connectedPartition5: ConnectedStreams[_, _] =
       connected.partitionByHash(x => x._1, x => x._1)
     val connectDownStreamId5: Integer = createDownStreamId(connectedPartition5)
 
@@ -492,7 +492,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     // we need to rebalance before iteration
     val source = env.fromElements(1, 2, 3).map { t: Int => t }
 
-    val iterated = source.iterate((input: ConnectedDataStream[Int, String]) => {
+    val iterated = source.iterate((input: ConnectedStreams[Int, String]) => {
       val head = input.map(i => (i + 1).toString, s => s)
       (head.filter(_ == "2"), head.filter(_ != "2"))
     }, 1000).print()
@@ -501,7 +501,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
       (input.map(_ + 1), input.map(_.toString)), 2000)
 
     try {
-      val invalid = source.iterate((input: ConnectedDataStream[Int, String]) => {
+      val invalid = source.iterate((input: ConnectedStreams[Int, String]) => {
         val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s)
         (head.filter(_ == "2"), head.filter(_ != "2"))
       }, 1000).print()
@@ -546,7 +546,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     return dataStream.print.getTransformation.getId
   }
 
-  private def createDownStreamId(dataStream: ConnectedDataStream[_, _]): Integer = {
+  private def createDownStreamId(dataStream: ConnectedStreams[_, _]): Integer = {
     val m = dataStream.map(x => 0, x => 0)
     m.print()
     m.getId

http://git-wip-us.apache.org/repos/asf/flink/blob/23d8e264/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index f53b986..66fe197 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -43,11 +43,11 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.datastream.DataStream.transform",
       "org.apache.flink.streaming.api.datastream.DataStream.getTransformation",
       "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy",
-      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getExecutionEnvironment",
-      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getType1",
-      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.getType2",
-      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.addGeneralWindowCombine",
-      "org.apache.flink.streaming.api.datastream.ConnectedDataStream.transform",
+      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment",
+      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1",
+      "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2",
+      "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine",
+      "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform",
       "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType",
       "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig",
 
@@ -104,9 +104,9 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       classOf[DataStream[_]])
 
     checkMethods(
-      "ConnectedDataStream", "ConnectedDataStream",
-      classOf[org.apache.flink.streaming.api.datastream.ConnectedDataStream[_,_]],
-      classOf[ConnectedDataStream[_,_]])
+      "ConnectedStreams", "ConnectedStreams",
+      classOf[org.apache.flink.streaming.api.datastream.ConnectedStreams[_,_]],
+      classOf[ConnectedStreams[_,_]])
 
     checkMethods(
       "SplitDataStream", "SplitDataStream",
@@ -114,11 +114,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       classOf[SplitDataStream[_]])
 
     checkMethods(
-      "StreamCrossOperator", "StreamCrossOperator",
-      classOf[org.apache.flink.streaming.api.datastream.temporal.StreamCrossOperator[_,_]],
-      classOf[StreamCrossOperator[_,_]])
-
-    checkMethods(
       "StreamJoinOperator", "StreamJoinOperator",
       classOf[org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator[_,_]],
       classOf[StreamJoinOperator[_,_]])