You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/17 21:38:06 UTC
[4/5] incubator-flink git commit: [FLINK-610] Replace Avro by Kryo as
the GenericType serializer
[FLINK-610] Replace Avro by Kryo as the GenericType serializer
The performance of data-intensive jobs using Kryo is probably going to be slow.
Set correct classloader
try to use Kryo.copy() with fallback to serialization copy
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a70aa67a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a70aa67a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a70aa67a
Branch: refs/heads/master
Commit: a70aa67a0881afc5d66329d46fe536d7a0b89fa8
Parents: 1c1c83b
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Dec 16 11:30:52 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 17 20:44:14 2014 +0100
----------------------------------------------------------------------
.../common/typeutils/SerializerTestBase.java | 5 +-
.../typeutils/SerializerTestInstance.java | 2 -
flink-java/pom.xml | 6 +
.../api/java/typeutils/GenericTypeInfo.java | 19 +--
.../flink/api/java/typeutils/TypeExtractor.java | 17 +-
.../java/typeutils/runtime/KryoSerializer.java | 58 ++++---
.../type/extractor/PojoTypeExtractionTest.java | 166 +++++++++++--------
.../AbstractGenericTypeSerializerTest.java | 2 +-
.../runtime/KryoGenericTypeComparatorTest.java | 2 +-
.../runtime/KryoGenericTypeSerializerTest.java | 44 ++++-
flink-scala/pom.xml | 8 +
.../runtime/KryoGenericTypeSerializerTest.scala | 128 ++++++++++++++
.../javaApiOperators/GroupReduceITCase.java | 71 +++++++-
.../util/CollectionDataSets.java | 77 +++++++++
14 files changed, 481 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index d509284..5122af9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -59,7 +59,10 @@ public abstract class SerializerTestBase<T> {
public void testInstantiate() {
try {
TypeSerializer<T> serializer = getSerializer();
-
+ if(serializer.getClass().getName().endsWith("KryoSerializer")) {
+ // the kryo serializer will return null. We ignore this test for Kryo.
+ return;
+ }
T instance = serializer.createInstance();
assertNotNull("The created instance must not be null.", instance);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index c48e879..7f65995 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.common.typeutils;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
public class SerializerTestInstance<T> extends SerializerTestBase<T> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 3b80b3e..6865cc9 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -58,6 +58,12 @@ under the License.
<artifactId>asm</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill_2.10</artifactId>
+ <version>0.5.1</version>
+ </dependency>
+
<!-- guava needs to be in "provided" scope, to make sure it is not included into the jars by the shading -->
<dependency>
<groupId>com.google.guava</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 8a1406b..5bc6cb9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -22,26 +22,16 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
-
-import java.util.Collection;
+import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
private final Class<T> typeClass;
- private final static Class<?>[] unsupportedByAvro = new Class[] {Collection.class};
-
+
public GenericTypeInfo(Class<T> typeClass) {
this.typeClass = typeClass;
- for (Class<?> unsupported: unsupportedByAvro) {
- if(unsupported.isAssignableFrom(typeClass)) {
- throw new RuntimeException("The type '"+typeClass+"' is currently not supported " +
- "by the Avro Serializer that Flink is using for serializing " +
- "arbitrary objects");
- }
- }
}
@Override
@@ -76,10 +66,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
@Override
public TypeSerializer<T> createSerializer() {
- // NOTE: The TypeExtractor / pojo logic is assuming that we are using a Avro Serializer here
- // in particular classes implementing GenericContainer are handled as GenericTypeInfos
- // (this will probably not work with Kryo)
- return new AvroSerializer<T>(this.typeClass);
+ return new KryoSerializer<T>(this.typeClass);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 3bceac5..e52e2af 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -30,7 +30,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.avro.generic.GenericContainer;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
@@ -947,16 +946,16 @@ public class TypeExtractor {
// special case handling for Class, this should not be handled by the POJO logic
return new GenericTypeInfo<X>(clazz);
}
- if(GenericContainer.class.isAssignableFrom(clazz)) {
- // this is a type generated by Avro. GenericTypeInfo is able to handle this case because its using Avro.
- return new GenericTypeInfo<X>(clazz);
- }
+
try {
TypeInformation<X> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), clazzTypeHint);
if (pojoType != null) {
return pojoType;
}
} catch (InvalidTypesException e) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Unable to handle type "+clazz+" as POJO. Message: "+e.getMessage(), e);
+ }
// ignore and create generic type info
}
@@ -1051,9 +1050,11 @@ public class TypeExtractor {
fieldTypeHierarchy.add(fieldType);
pojoFields.add(new PojoField(field, createTypeInfoWithTypeHierarchy(fieldTypeHierarchy, fieldType, null, null) ));
} catch (InvalidTypesException e) {
- //pojoFields.add(new PojoField(field, new GenericTypeInfo( Object.class ))); // we need kryo to properly serialize this
- throw new InvalidTypesException("Flink is currently unable to serialize this type: "+fieldType+""
- + "\nThe system is internally using the Avro serializer which is not able to handle that type.", e);
+ Class<?> genericClass = Object.class;
+ if(isClassType(fieldType)) {
+ genericClass = typeToClass(fieldType);
+ }
+ pojoFields.add(new PojoField(field, new GenericTypeInfo( genericClass )));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 7a98abf..f2c5848 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -19,20 +19,23 @@
package org.apache.flink.api.java.typeutils.runtime;
import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import com.twitter.chill.ScalaKryoInstantiator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class KryoSerializer<T> extends TypeSerializer<T> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final Class<T> type;
- private final Class<? extends T> typeToInstantiate;
private transient Kryo kryo;
private transient T copyInstance;
@@ -44,21 +47,13 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
private transient Output output;
public KryoSerializer(Class<T> type){
- this(type,type);
- }
-
- public KryoSerializer(Class<T> type, Class<? extends T> typeToInstantiate){
- if(type == null || typeToInstantiate == null){
+ if(type == null){
throw new NullPointerException("Type class cannot be null.");
}
-
this.type = type;
- this.typeToInstantiate = typeToInstantiate;
- kryo = new Kryo();
- kryo.setAsmEnabled(true);
- kryo.register(type);
}
+
@Override
public boolean isImmutableType() {
return false;
@@ -71,20 +66,36 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
@Override
public T createInstance() {
- checkKryoInitialized();
- return kryo.newInstance(typeToInstantiate);
+ return null;
}
@Override
public T copy(T from) {
+ if(from == null) {
+ return null;
+ }
checkKryoInitialized();
- return kryo.copy(from);
+ try {
+ return kryo.copy(from);
+ } catch(KryoException ke) {
+ // kryo was unable to copy it, so we do it through serialization:
+ ByteArrayOutputStream baout = new ByteArrayOutputStream();
+ Output output = new Output(baout);
+
+ kryo.writeObject(output, from);
+
+ output.flush();
+
+ ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
+ Input input = new Input(bain);
+
+ return (T)kryo.readObject(input, from.getClass());
+ }
}
@Override
public T copy(T from, T reuse) {
- checkKryoInitialized();
- return kryo.copy(from);
+ return copy(from);
}
@Override
@@ -101,7 +112,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
previousOut = target;
}
- kryo.writeObject(output, record);
+ kryo.writeClassAndObject(output, record);
output.flush();
}
@@ -113,7 +124,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
input = new NoFetchingInput(inputStream);
previousIn = source;
}
- return kryo.readObject(input, typeToInstantiate);
+ return (T) kryo.readClassAndObject(input);
}
@Override
@@ -136,14 +147,14 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
@Override
public int hashCode() {
- return type.hashCode() + 31 * typeToInstantiate.hashCode();
+ return type.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj != null && obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;
- return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate;
+ return other.type == this.type;
} else {
return false;
}
@@ -153,9 +164,10 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
private void checkKryoInitialized() {
if (this.kryo == null) {
- this.kryo = new Kryo();
- this.kryo.setAsmEnabled(true);
+ this.kryo = new ScalaKryoInstantiator().newKryo();
+ this.kryo.setRegistrationRequired(false);
this.kryo.register(type);
+ this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index e5ac1ca..893e63c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -45,7 +45,7 @@ import com.google.common.collect.HashMultiset;
/**
* Pojo Type tests
- *
+ *
* A Pojo is a bean-style class with getters, setters and empty ctor
* OR a class with all fields public (or for every private field, there has to be a public getter/setter)
* everything else is a generic type (that can't be used for field selection)
@@ -55,12 +55,12 @@ public class PojoTypeExtractionTest {
public static class HasDuplicateField extends WC {
private int count; // duplicate
}
-
+
@Test(expected=RuntimeException.class)
public void testDuplicateFieldException() {
TypeExtractor.createTypeInfo(HasDuplicateField.class);
}
-
+
// test with correct pojo types
public static class WC { // is a pojo
public ComplexNestedClass complex; // is a pojo
@@ -84,6 +84,7 @@ public class PojoTypeExtractionTest {
public Tuple3<Long, Long, String> word; //Tuple Type with three basic types
public Object nothing; // generic type
public MyWritable hadoopCitizen; // writableType
+ public List<String> collection;
}
// all public test
@@ -92,7 +93,7 @@ public class PojoTypeExtractionTest {
public HashMultiset<Integer> fancyIds; // generic type
public String[] fancyArray; // generic type
}
-
+
public static class ParentSettingGenerics extends PojoWithGenerics<Integer, Long> {
public String field3;
}
@@ -101,16 +102,16 @@ public class PojoTypeExtractionTest {
public T1 field1;
public T2 field2;
}
-
+
public static class ComplexHierarchyTop extends ComplexHierarchy<Tuple1<String>> {}
public static class ComplexHierarchy<T> extends PojoWithGenerics<FromTuple,T> {}
-
+
// extends from Tuple and adds a field
public static class FromTuple extends Tuple3<String, String, Long> {
private static final long serialVersionUID = 1L;
public int special;
}
-
+
public static class IncorrectPojo {
private int isPrivate;
public int getIsPrivate() {
@@ -118,7 +119,7 @@ public class PojoTypeExtractionTest {
}
// setter is missing (intentional)
}
-
+
// correct pojo
public static class BeanStylePojo {
public String abc;
@@ -136,7 +137,7 @@ public class PojoTypeExtractionTest {
this.a = a;
}
}
-
+
// in this test, the location of the getters and setters is mixed across the type hierarchy.
public static class TypedPojoGetterSetterCheck extends GenericPojoGetterSetterCheck<String> {
public void setPackageProtected(String in) {
@@ -149,50 +150,64 @@ public class PojoTypeExtractionTest {
return packageProtected;
}
}
-
+
@Test
public void testIncorrectPojos() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(IncorrectPojo.class);
Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
-
+
typeForClass = TypeExtractor.createTypeInfo(WrongCtorPojo.class);
Assert.assertTrue(typeForClass instanceof GenericTypeInfo<?>);
}
-
+
@Test
public void testCorrectPojos() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(BeanStylePojo.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
-
+
typeForClass = TypeExtractor.createTypeInfo(TypedPojoGetterSetterCheck.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
}
-
+
@Test
public void testPojoWC() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(WC.class);
checkWCPojoAsserts(typeForClass);
-
+
WC t = new WC();
t.complex = new ComplexNestedClass();
TypeInformation<?> typeForObject = TypeExtractor.getForObject(t);
checkWCPojoAsserts(typeForObject);
}
-
+
private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
Assert.assertFalse(typeInfo.isBasicType());
Assert.assertFalse(typeInfo.isTupleType());
- Assert.assertEquals(9, typeInfo.getTotalFields());
+ Assert.assertEquals(10, typeInfo.getTotalFields());
Assert.assertTrue(typeInfo instanceof PojoTypeInfo);
PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) typeInfo;
-
+
List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>();
- String[] fields = {"count","complex.date", "complex.hadoopCitizen", "complex.nothing",
- "complex.someFloat", "complex.someNumber", "complex.word.f0",
- "complex.word.f1", "complex.word.f2"};
- int[] positions = {8,0,1,2,
- 3,4,5,
- 6,7};
+ String[] fields = {"count",
+ "complex.date",
+ "complex.hadoopCitizen",
+ "complex.collection",
+ "complex.nothing",
+ "complex.someFloat",
+ "complex.someNumber",
+ "complex.word.f0",
+ "complex.word.f1",
+ "complex.word.f2"};
+ int[] positions = {9,
+ 1,
+ 2,
+ 0,
+ 3,
+ 4,
+ 5,
+ 6,
+ 7,
+ 8};
Assert.assertEquals(fields.length, positions.length);
for(int i = 0; i < fields.length; i++) {
pojoType.getKey(fields[i], 0, ffd);
@@ -200,86 +215,93 @@ public class PojoTypeExtractionTest {
Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition());
ffd.clear();
}
-
+
pojoType.getKey("complex.word.*", 0, ffd);
Assert.assertEquals(3, ffd.size());
// check if it returns 5,6,7
for(FlatFieldDescriptor ffdE : ffd) {
final int pos = ffdE.getPosition();
- Assert.assertTrue(pos <= 7 );
- Assert.assertTrue(5 <= pos );
- if(pos == 5) {
- Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
- }
+ Assert.assertTrue(pos <= 8 );
+ Assert.assertTrue(6 <= pos );
if(pos == 6) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 7) {
+ Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+ }
+ if(pos == 8) {
Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
}
}
ffd.clear();
-
+
// scala style full tuple selection for pojos
pojoType.getKey("complex.word._", 0, ffd);
Assert.assertEquals(3, ffd.size());
ffd.clear();
-
+
pojoType.getKey("complex.*", 0, ffd);
- Assert.assertEquals(8, ffd.size());
+ Assert.assertEquals(9, ffd.size());
// check if it returns 0-7
for(FlatFieldDescriptor ffdE : ffd) {
final int pos = ffdE.getPosition();
- Assert.assertTrue(ffdE.getPosition() <= 7 );
+ Assert.assertTrue(ffdE.getPosition() <= 8 );
Assert.assertTrue(0 <= ffdE.getPosition() );
+
if(pos == 0) {
- Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
+ Assert.assertEquals(List.class, ffdE.getType().getTypeClass());
}
if(pos == 1) {
- Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
+ Assert.assertEquals(Date.class, ffdE.getType().getTypeClass());
}
if(pos == 2) {
- Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
+ Assert.assertEquals(MyWritable.class, ffdE.getType().getTypeClass());
}
if(pos == 3) {
- Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
+ Assert.assertEquals(Object.class, ffdE.getType().getTypeClass());
}
if(pos == 4) {
- Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+ Assert.assertEquals(Float.class, ffdE.getType().getTypeClass());
}
if(pos == 5) {
- Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+ Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
}
if(pos == 6) {
Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
}
if(pos == 7) {
+ Assert.assertEquals(Long.class, ffdE.getType().getTypeClass());
+ }
+ if(pos == 8) {
Assert.assertEquals(String.class, ffdE.getType().getTypeClass());
}
+ if(pos == 9) {
+ Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
+ }
}
ffd.clear();
-
+
pojoType.getKey("*", 0, ffd);
- Assert.assertEquals(9, ffd.size());
+ Assert.assertEquals(10, ffd.size());
// check if it returns 0-8
for(FlatFieldDescriptor ffdE : ffd) {
- Assert.assertTrue(ffdE.getPosition() <= 8 );
+ Assert.assertTrue(ffdE.getPosition() <= 9 );
Assert.assertTrue(0 <= ffdE.getPosition() );
- if(ffdE.getPosition() == 8) {
+ if(ffdE.getPosition() == 9) {
Assert.assertEquals(Integer.class, ffdE.getType().getTypeClass());
}
}
ffd.clear();
-
+
TypeInformation<?> typeComplexNested = pojoType.getTypeAt(0); // ComplexNestedClass complex
Assert.assertTrue(typeComplexNested instanceof PojoTypeInfo);
-
- Assert.assertEquals(6, typeComplexNested.getArity());
- Assert.assertEquals(8, typeComplexNested.getTotalFields());
+
+ Assert.assertEquals(7, typeComplexNested.getArity());
+ Assert.assertEquals(9, typeComplexNested.getTotalFields());
PojoTypeInfo<?> pojoTypeComplexNested = (PojoTypeInfo<?>) typeComplexNested;
-
+
boolean dateSeen = false, intSeen = false, floatSeen = false,
- tupleSeen = false, objectSeen = false, writableSeen = false;
+ tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false;
for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
PojoField field = pojoTypeComplexNested.getPojoFieldAt(i);
String name = field.field.getName();
@@ -330,6 +352,13 @@ public class PojoTypeExtractionTest {
writableSeen = true;
Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
Assert.assertEquals(MyWritable.class, field.type.getTypeClass());
+ } else if(name.equals("collection")) {
+ if(collectionSeen) {
+ Assert.fail("already seen");
+ }
+ collectionSeen = true;
+ Assert.assertEquals(new GenericTypeInfo(List.class), field.type);
+
} else {
Assert.fail("field "+field+" is not expected");
}
@@ -340,29 +369,29 @@ public class PojoTypeExtractionTest {
Assert.assertTrue("Field was not present", tupleSeen);
Assert.assertTrue("Field was not present", objectSeen);
Assert.assertTrue("Field was not present", writableSeen);
-
+ Assert.assertTrue("Field was not present", collectionSeen);
+
TypeInformation<?> typeAtOne = pojoType.getTypeAt(1); // int count
Assert.assertTrue(typeAtOne instanceof BasicTypeInfo);
-
+
Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
Assert.assertEquals(typeInfo.getArity(), 2);
}
// Kryo is required for this, so disable for now.
- @Ignore
@Test
public void testPojoAllPublic() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class);
checkAllPublicAsserts(typeForClass);
-
+
TypeInformation<?> typeForObject = TypeExtractor.getForObject(new AllPublic() );
checkAllPublicAsserts(typeForObject);
}
-
+
private void checkAllPublicAsserts(TypeInformation<?> typeInformation) {
Assert.assertTrue(typeInformation instanceof PojoTypeInfo);
- Assert.assertEquals(9, typeInformation.getArity());
- Assert.assertEquals(11, typeInformation.getTotalFields());
+ Assert.assertEquals(10, typeInformation.getArity());
+ Assert.assertEquals(12, typeInformation.getTotalFields());
// check if the three additional fields are identified correctly
boolean arrayListSeen = false, multisetSeen = false, strArraySeen = false;
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
@@ -390,9 +419,9 @@ public class PojoTypeExtractionTest {
strArraySeen = true;
Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
Assert.assertEquals(String[].class, field.type.getTypeClass());
- } else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen").contains(name)) {
+ } else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) {
// ignore these, they are inherited from the ComplexNestedClass
- }
+ }
else {
Assert.fail("field "+field+" is not expected");
}
@@ -401,18 +430,18 @@ public class PojoTypeExtractionTest {
Assert.assertTrue("Field was not present", multisetSeen);
Assert.assertTrue("Field was not present", strArraySeen);
}
-
+
@Test
public void testPojoExtendingTuple() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(FromTuple.class);
checkFromTuplePojo(typeForClass);
-
+
FromTuple ft = new FromTuple();
ft.f0 = ""; ft.f1 = ""; ft.f2 = 0L;
TypeInformation<?> typeForObject = TypeExtractor.getForObject(ft);
checkFromTuplePojo(typeForObject);
}
-
+
private void checkFromTuplePojo(TypeInformation<?> typeInformation) {
Assert.assertTrue(typeInformation instanceof PojoTypeInfo<?>);
Assert.assertEquals(4, typeInformation.getTotalFields());
@@ -431,7 +460,7 @@ public class PojoTypeExtractionTest {
}
}
}
-
+
@Test
public void testPojoWithGenerics() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ParentSettingGenerics.class);
@@ -453,13 +482,12 @@ public class PojoTypeExtractionTest {
}
}
}
-
+
/**
* Test if the TypeExtractor is accepting untyped generics,
* making them GenericTypes
*/
@Test
- @Ignore // kryo needed.
public void testPojoWithGenericsSomeFieldsGeneric() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(PojoWithGenerics.class);
Assert.assertTrue(typeForClass instanceof PojoTypeInfo<?>);
@@ -478,8 +506,8 @@ public class PojoTypeExtractionTest {
}
}
}
-
-
+
+
@Test
public void testPojoWithComplexHierarchy() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(ComplexHierarchyTop.class);
@@ -554,10 +582,10 @@ public class PojoTypeExtractionTest {
public VertexTyped() {
}
}
-
+
@Test
public void testGetterSetterWithVertex() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<VertexTyped> set = env.fromElements(new VertexTyped(0L, 3.0), new VertexTyped(1L, 1.0));
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
index cacc05b..d604105 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/AbstractGenericTypeSerializerTest.java
@@ -143,7 +143,7 @@ abstract public class AbstractGenericTypeSerializerTest {
}
}
- private final <T> void runTests(T... instances) {
+ protected final <T> void runTests(T... instances) {
if (instances == null || instances.length == 0) {
throw new IllegalArgumentException();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
index c6ef4db..37dba4e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeComparatorTest.java
@@ -25,4 +25,4 @@ public class KryoGenericTypeComparatorTest extends AbstractGenericTypeComparator
protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
return new KryoSerializer<T>(type);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
index f6fc987..3c22b15 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -18,11 +18,53 @@
package org.apache.flink.api.java.typeutils.runtime;
+
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
+
+ @Test
+ public void testJavaList(){
+ Collection<Integer> a = new ArrayList<Integer>();
+
+ fillCollection(a);
+
+ runTests(a);
+ }
+
+ @Test
+ public void testJavaSet(){
+ Collection<Integer> b = new HashSet<Integer>();
+
+ fillCollection(b);
+
+ runTests(b);
+ }
+
+ @Test
+ public void testJavaDequeue(){
+ Collection<Integer> c = new LinkedList<Integer>();
+
+ fillCollection(c);
+
+ runTests(c);
+ }
+
+ private void fillCollection(Collection<Integer> coll){
+ coll.add(42);
+ coll.add(1337);
+ coll.add(49);
+ coll.add(1);
+ }
+
@Override
protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
return new KryoSerializer<T>(type);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index dab1bd5..5139fb5 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -95,6 +95,14 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
new file mode 100644
index 0000000..ddbe322
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.SerializerTestInstance
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.junit.Test
+
+import scala.reflect._
+
+class KryoGenericTypeSerializerTest {
+
+ @Test
+ def testScalaListSerialization: Unit = {
+ val a = List(42,1,49,1337)
+
+ runTests(a)
+ }
+
+ @Test
+ def testScalaMutablelistSerialization: Unit = {
+ val a = scala.collection.mutable.ListBuffer(42,1,49,1337)
+
+ runTests(a)
+ }
+
+ @Test
+ def testScalaMapSerialization: Unit = {
+ val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337))
+
+ runTests(a)
+ }
+
+ @Test
+ def testMutableMapSerialization: Unit ={
+ val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3"))
+
+ runTests(a)
+ }
+
+ @Test
+ def testScalaListComplexTypeSerialization: Unit = {
+ val a = ComplexType("1234", 42, List(1,2,3,4))
+ val b = ComplexType("4321", 24, List(4,3,2,1))
+ val c = ComplexType("1337", 1, List(1))
+ val list = List(a, b, c)
+
+ runTests(list)
+ }
+
+ @Test
+ def testHeterogenousScalaList: Unit = {
+ val a = new DerivedType("foo", "bar")
+ val b = new BaseType("foobar")
+ val c = new DerivedType2("bar", "foo")
+ val list = List(a,b,c)
+
+ runTests(list)
+ }
+
+ case class ComplexType(id: String, number: Int, values: List[Int]){
+ override def equals(obj: Any): Boolean ={
+ if(obj != null && obj.isInstanceOf[ComplexType]){
+ val complexType = obj.asInstanceOf[ComplexType]
+ id.equals(complexType.id) && number.equals(complexType.number) && values.equals(
+ complexType.values)
+ }else{
+ false
+ }
+ }
+ }
+
+ class BaseType(val name: String){
+ override def equals(obj: Any): Boolean = {
+ if(obj != null && obj.isInstanceOf[BaseType]){
+ obj.asInstanceOf[BaseType].name.equals(name)
+ }else{
+ false
+ }
+ }
+ }
+
+ class DerivedType(name: String, val sub: String) extends BaseType(name){
+ override def equals(obj: Any): Boolean = {
+ if(obj != null && obj.isInstanceOf[DerivedType]){
+ super.equals(obj) && obj.asInstanceOf[DerivedType].sub.equals(sub)
+ }else{
+ false
+ }
+ }
+ }
+
+ class DerivedType2(name: String, val sub: String) extends BaseType(name){
+ override def equals(obj: Any): Boolean = {
+ if(obj != null && obj.isInstanceOf[DerivedType2]){
+ super.equals(obj) && obj.asInstanceOf[DerivedType2].sub.equals(sub)
+ }else{
+ false
+ }
+ }
+ }
+
+ def runTests[T : ClassTag](objects: T *): Unit ={
+ val clsTag = classTag[T]
+ val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
+ val serializer = typeInfo.createSerializer()
+ val typeClass = typeInfo.getTypeClass
+
+ val instance = new SerializerTestInstance[T](serializer, typeClass, -1, objects: _*)
+
+ instance.testAll()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 60a0d89..8994ba9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -53,7 +53,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class GroupReduceITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 26;
+ private static int NUM_PROGRAMS = 28;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -763,7 +763,74 @@ public class GroupReduceITCase extends JavaProgramTestBase {
// return expected result
return "b\nccc\nee\n";
}
-
+
+ case 27: {
+ /*
+ * Test Java collections within pojos ( == test kryo)
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+ // f0.f0 is first integer
+ DataSet<String> reduceDs = ds.groupBy("key")
+ .reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
+ @Override
+ public void reduce(
+ Iterable<CollectionDataSets.PojoWithCollection> values,
+ Collector<String> out) throws Exception {
+ StringBuilder concat = new StringBuilder();
+ concat.append("call");
+ for(CollectionDataSets.PojoWithCollection value : values) {
+ concat.append("For key "+value.key+" we got: ");
+ for(CollectionDataSets.Pojo1 p :value.pojos) {
+ concat.append("pojo.a="+p.a);
+ }
+ }
+ out.collect(concat.toString());
+ }
+ });
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
+ }
+
+ case 28: {
+ /*
+ * Group by generic type
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(1);
+
+ DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
+ // f0.f0 is first integer
+ DataSet<String> reduceDs = ds.groupBy("bigInt")
+ .reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
+ @Override
+ public void reduce(
+ Iterable<CollectionDataSets.PojoWithCollection> values,
+ Collector<String> out) throws Exception {
+ StringBuilder concat = new StringBuilder();
+ concat.append("call");
+ for(CollectionDataSets.PojoWithCollection value : values) {
+ concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
+ }
+ out.collect(concat.toString());
+ }
+ });
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "call\n" +
+ "For key 92233720368547758070 we got:\n" +
+ "PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
+ "For key 92233720368547758070 we got:\n" +
+ "PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
+ }
+
default: {
throw new IllegalArgumentException("Invalid program id");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a70aa67a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 1f812d9..895e996 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -18,11 +18,17 @@
package org.apache.flink.test.javaApiOperators.util;
+import java.io.File;
import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
import java.util.List;
+import java.util.Map;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -33,6 +39,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.hadoop.io.IntWritable;
+import scala.math.BigInt;
/**
* #######################################################################################################
@@ -496,6 +503,13 @@ public class CollectionDataSets {
public static class Pojo1 {
public String a;
public String b;
+
+ public Pojo1() {}
+
+ public Pojo1(String a, String b) {
+ this.a = a;
+ this.b = b;
+ }
}
public static class Pojo2 {
@@ -561,5 +575,68 @@ public class CollectionDataSets {
return env.fromCollection(data);
}
+ public static class PojoWithCollection {
+ public List<Pojo1> pojos;
+ public int key;
+ public java.sql.Date sqlDate;
+ public BigInteger bigInt;
+ public BigDecimal bigDecimalKeepItNull;
+ public BigInt scalaBigInt;
+ public List<Object> mixed;
+
+ @Override
+ public String toString() {
+ return "PojoWithCollection{" +
+ "pojos.size()=" + pojos.size() +
+ ", key=" + key +
+ ", sqlDate=" + sqlDate +
+ ", bigInt=" + bigInt +
+ ", bigDecimalKeepItNull=" + bigDecimalKeepItNull +
+ ", scalaBigInt=" + scalaBigInt +
+ ", mixed=" + mixed +
+ '}';
+ }
+ }
+
+ public static DataSet<PojoWithCollection> getPojoWithCollection(ExecutionEnvironment env) {
+ List<PojoWithCollection> data = new ArrayList<PojoWithCollection>();
+
+ List<Pojo1> pojosList1 = new ArrayList<Pojo1>();
+ pojosList1.add(new Pojo1("a", "aa"));
+ pojosList1.add(new Pojo1("b", "bb"));
+
+ List<Pojo1> pojosList2 = new ArrayList<Pojo1>();
+ pojosList2.add(new Pojo1("a2", "aa2"));
+ pojosList2.add(new Pojo1("b2", "bb2"));
+
+ PojoWithCollection pwc1 = new PojoWithCollection();
+ pwc1.pojos = pojosList1;
+ pwc1.key = 0;
+ pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+ pwc1.scalaBigInt = BigInt.int2bigInt(10);
+ pwc1.bigDecimalKeepItNull = null;
+ pwc1.sqlDate = new java.sql.Date(2000000000000L); // 2033 ;)
+ pwc1.mixed = new ArrayList<Object>();
+ Map<String, Integer> map = new HashMap<String, Integer>();
+ map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
+ pwc1.mixed.add(map);
+ pwc1.mixed.add(new File("/this/is/wrong"));
+ pwc1.mixed.add("uhlala");
+
+ PojoWithCollection pwc2 = new PojoWithCollection();
+ pwc2.pojos = pojosList2;
+ pwc2.key = 0;
+ pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
+ pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
+ pwc2.bigDecimalKeepItNull = null;
+ pwc2.sqlDate = new java.sql.Date(200000000000L); // 1976
+
+
+ data.add(pwc1);
+ data.add(pwc2);
+
+ return env.fromCollection(data);
+ }
+
}