You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:42 UTC

[05/51] [abbrv] git commit: [streaming] Fixed StreamCollectorTest

[streaming] Fixed StreamCollectorTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/11f62c1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/11f62c1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/11f62c1e

Branch: refs/heads/master
Commit: 11f62c1e32654ccda6d2ac1588cc4b6afa74e32f
Parents: c21f0e3
Author: ghermann <re...@gmail.com>
Authored: Mon Jul 21 11:23:32 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:14:13 2014 +0200

----------------------------------------------------------------------
 .../streaming/api/collector/StreamCollectorTest.java    | 10 ++++++----
 .../streaming/api/streamcomponent/MockRecordWriter.java | 12 ++++++------
 .../flink/streaming/util/MockRecordWriterFactory.java   |  4 ++--
 3 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/11f62c1e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 8fc8704..7c7f593 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -19,27 +19,29 @@
 
 package org.apache.flink.streaming.api.collector;
 
+import static org.junit.Assert.assertArrayEquals;
+
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.MockRecordWriterFactory;
 import org.junit.Test;
 
 public class StreamCollectorTest {
 
-
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
 	@Test
 	public void testCollect() {
 		MockRecordWriter recWriter = MockRecordWriterFactory.create();
 
-		StreamCollector collector = new StreamCollector(2, null);
+		StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(null));
 		collector.addOutput(recWriter, null);
 		collector.collect(new Tuple1<Integer>(3));
 		collector.collect(new Tuple1<Integer>(4));
 		collector.collect(new Tuple1<Integer>(5));
 		collector.collect(new Tuple1<Integer>(6));
 
+		assertArrayEquals(new Integer[] {3, 4, 5, 6}, recWriter.emittedRecords.toArray());
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/11f62c1e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
index e4574f1..e99518c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
@@ -22,26 +22,26 @@ package org.apache.flink.streaming.api.streamcomponent;
 import java.util.ArrayList;
 
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 
-public class MockRecordWriter extends RecordWriter<StreamRecord<Tuple1<Integer>>> {
+public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
 
-	public ArrayList<StreamRecord<Tuple1<Integer>>> emittedRecords;
+	public ArrayList<Integer> emittedRecords;
 
 	public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
 		super(inputBase);
 	}
 
 	public boolean initList() {
-		emittedRecords = new ArrayList<StreamRecord<Tuple1<Integer>>>();
+		emittedRecords = new ArrayList<Integer>();
 		return true;
 	}
 	
 	@Override
-	public void emit(StreamRecord<Tuple1<Integer>> record) {
-		emittedRecords.add(record);
+	public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
+		emittedRecords.add(record.getInstance().getTuple().f0);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/11f62c1e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
index ed018eb..4b85119 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
@@ -22,8 +22,8 @@ package org.apache.flink.streaming.util;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.mockito.Mockito;
 
 public class MockRecordWriterFactory {
@@ -33,7 +33,7 @@ public class MockRecordWriterFactory {
 		MockRecordWriter recWriter = mock(MockRecordWriter.class);
 		
 		Mockito.when(recWriter.initList()).thenCallRealMethod();
-		doCallRealMethod().when(recWriter).emit(Mockito.any(StreamRecord.class));
+		doCallRealMethod().when(recWriter).emit(Mockito.any(SerializationDelegate.class));
 		
 		recWriter.initList();