You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:42 UTC
[05/51] [abbrv] git commit: [streaming] Fixed StreamCollectorTest
[streaming] Fixed StreamCollectorTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/11f62c1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/11f62c1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/11f62c1e
Branch: refs/heads/master
Commit: 11f62c1e32654ccda6d2ac1588cc4b6afa74e32f
Parents: c21f0e3
Author: ghermann <re...@gmail.com>
Authored: Mon Jul 21 11:23:32 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:14:13 2014 +0200
----------------------------------------------------------------------
.../streaming/api/collector/StreamCollectorTest.java | 10 ++++++----
.../streaming/api/streamcomponent/MockRecordWriter.java | 12 ++++++------
.../flink/streaming/util/MockRecordWriterFactory.java | 4 ++--
3 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/11f62c1e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 8fc8704..7c7f593 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -19,27 +19,29 @@
package org.apache.flink.streaming.api.collector;
+import static org.junit.Assert.assertArrayEquals;
+
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockRecordWriterFactory;
import org.junit.Test;
public class StreamCollectorTest {
-
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testCollect() {
MockRecordWriter recWriter = MockRecordWriterFactory.create();
- StreamCollector collector = new StreamCollector(2, null);
+ StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(null));
collector.addOutput(recWriter, null);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
collector.collect(new Tuple1<Integer>(6));
+ assertArrayEquals(new Integer[] {3, 4, 5, 6}, recWriter.emittedRecords.toArray());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/11f62c1e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
index e4574f1..e99518c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
@@ -22,26 +22,26 @@ package org.apache.flink.streaming.api.streamcomponent;
import java.util.ArrayList;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.io.network.api.RecordWriter;
-public class MockRecordWriter extends RecordWriter<StreamRecord<Tuple1<Integer>>> {
+public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamRecord<Tuple1<Integer>>>> {
- public ArrayList<StreamRecord<Tuple1<Integer>>> emittedRecords;
+ public ArrayList<Integer> emittedRecords;
public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord<Tuple1<Integer>>> outputClass) {
super(inputBase);
}
public boolean initList() {
- emittedRecords = new ArrayList<StreamRecord<Tuple1<Integer>>>();
+ emittedRecords = new ArrayList<Integer>();
return true;
}
@Override
- public void emit(StreamRecord<Tuple1<Integer>> record) {
- emittedRecords.add(record);
+ public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
+ emittedRecords.add(record.getInstance().getTuple().f0);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/11f62c1e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
index ed018eb..4b85119 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockRecordWriterFactory.java
@@ -22,8 +22,8 @@ package org.apache.flink.streaming.util;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.mockito.Mockito;
public class MockRecordWriterFactory {
@@ -33,7 +33,7 @@ public class MockRecordWriterFactory {
MockRecordWriter recWriter = mock(MockRecordWriter.class);
Mockito.when(recWriter.initList()).thenCallRealMethod();
- doCallRealMethod().when(recWriter).emit(Mockito.any(StreamRecord.class));
+ doCallRealMethod().when(recWriter).emit(Mockito.any(SerializationDelegate.class));
recWriter.initList();