You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/29 16:05:41 UTC
[3/3] flink git commit: [streaming] Cleanup for projection update
[streaming] Cleanup for projection update
Closes #630
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40be172e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40be172e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40be172e
Branch: refs/heads/master
Commit: 40be172e4b379959308860dae4ba36d796c34114
Parents: 8c46155
Author: mbalassi <mb...@apache.org>
Authored: Wed Apr 29 15:16:25 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Wed Apr 29 15:26:00 2015 +0200
----------------------------------------------------------------------
.../api/datastream/StreamProjection.java | 2 -
.../streaming/api/operators/ProjectTest.java | 61 +++++++++++++++++---
.../api/operators/ProjectWithoutClassTest.java | 22 -------
3 files changed, 53 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index c19dbb2..447b1fd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -48,8 +48,6 @@ import org.apache.flink.api.java.tuple.Tuple9;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.operators.StreamProject;
-import java.util.Arrays;
-
public class StreamProjection<IN> {
private DataStream<IN> dataStream;
http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
index 0a712e0..035abe6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
@@ -18,39 +18,45 @@
package org.apache.flink.streaming.api.operators;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.StreamProjection;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.util.MockContext;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
public class ProjectTest implements Serializable {
private static final long serialVersionUID = 1L;
@Test
- public void test() {
+ public void operatorTest() {
TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
- .getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
- 4));
+ .getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
- int[] fields = new int[] { 4, 4, 3 };
- // Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
+ int[] fields = new int[]{4, 4, 3};
@SuppressWarnings("unchecked")
StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
- fields,
- new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection
- .extractFieldTypes(fields, inType)));
+ fields,
+ new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection
+ .extractFieldTypes(fields, inType)));
List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer,
String, Integer>>();
@@ -67,4 +73,43 @@ public class ProjectTest implements Serializable {
assertEquals(expected, MockContext.createAndExecute(operator, input));
}
+
+
+ // tests using projection from the API without explicitly specifying the types
+ private static final long MEMORY_SIZE = 32;
+ private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
+ private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();
+
+ @Test
+ public void APIWithoutTypesTest() {
+
+ for (Long i = 1L; i < 11L; i++) {
+ expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
+ }
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+
+ env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
+ @Override
+ public Tuple3<Long, Character, Double> map(Long value) throws Exception {
+ return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
+ }
+ })
+ .project(0, 2)
+ .addSink(new SinkFunction<Tuple>() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void invoke(Tuple value) throws Exception {
+ actual.add( (Tuple2<Long,Double>) value);
+ }
+ });
+
+ try {
+ env.execute();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ assertEquals(expected, actual);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/40be172e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java
deleted file mode 100644
index f924cfb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class ProjectWithoutClassTest {
-
- public static void main(String[] args) throws Exception {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env.generateSequence(1, 100).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
-
- @Override
- public Tuple3<Long, Character, Double> map(Long value) throws Exception {
- return new Tuple3<Long, Character, Double>(value, 'c', (double) value);
- }
- }).project(0,2).print();
-
- env.execute("ProjectWithoutClassTest");
- }
-}