You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/29 16:05:41 UTC

[3/3] flink git commit: [streaming] Cleanup for projection update

[streaming] Cleanup for projection update

Closes #630


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40be172e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40be172e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40be172e

Branch: refs/heads/master
Commit: 40be172e4b379959308860dae4ba36d796c34114
Parents: 8c46155
Author: mbalassi <mb...@apache.org>
Authored: Wed Apr 29 15:16:25 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Apr 29 15:26:00 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/StreamProjection.java        |  2 -
 .../streaming/api/operators/ProjectTest.java    | 61 +++++++++++++++++---
 .../api/operators/ProjectWithoutClassTest.java  | 22 -------
 3 files changed, 53 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index c19dbb2..447b1fd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -48,8 +48,6 @@ import org.apache.flink.api.java.tuple.Tuple9;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.operators.StreamProject;
 
-import java.util.Arrays;
-
 public class StreamProjection<IN> {
 
 	private DataStream<IN> dataStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index 0a712e0..035abe6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -18,39 +18,45 @@
 package org.apache.flink.streaming.api.operators;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.StreamProjection;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.util.MockContext;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.junit.Test;
 
 public class ProjectTest implements Serializable {
 	private static final long serialVersionUID = 1L;
 
 	@Test
-	public void test() {
+	public void operatorTest() {
 
 		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
-				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
-						4));
+				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
 
-		int[] fields = new int[] { 4, 4, 3 };
-		// Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
+		int[] fields = new int[]{4, 4, 3};
 
 		@SuppressWarnings("unchecked")
 		StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
 				new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-				fields,
-				new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection
-						.extractFieldTypes(fields, inType)));
+						fields,
+						new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection
+								.extractFieldTypes(fields, inType)));
 
 		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer,
 				String, Integer>>();
@@ -67,4 +73,43 @@ public class ProjectTest implements Serializable {
 
 		assertEquals(expected, MockContext.createAndExecute(operator, input));
 	}
+
+
+	// tests using projection from the API without explicitly specifying the types
+	private static final long MEMORY_SIZE = 32;
+	private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
+	private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();
+
+	@Test
+	public void APIWithoutTypesTest() {
+
+		for (Long i = 1L; i < 11L; i++) {
+			expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
+		}
+
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+
+		env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
+				@Override
+				public Tuple3<Long, Character, Double> map(Long value) throws Exception {
+					return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
+				}
+			})
+			.project(0, 2)
+			.addSink(new SinkFunction<Tuple>() {
+				@Override
+				@SuppressWarnings("unchecked")
+				public void invoke(Tuple value) throws Exception {
+					actual.add( (Tuple2<Long,Double>) value);
+				}
+			});
+
+		try {
+			env.execute();
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+
+		assertEquals(expected, actual);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java
deleted file mode 100644
index f924cfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class ProjectWithoutClassTest {
-
-	public static void main(String[] args) throws Exception {
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		env.generateSequence(1, 100).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
-
-			@Override
-			public Tuple3<Long, Character, Double> map(Long value) throws Exception {
-				return new Tuple3<Long, Character, Double>(value, 'c', (double) value);
-			}
-		}).project(0,2).print();
-
-		env.execute("ProjectWithoutClassTest");
-	}
-}