You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/04 16:49:14 UTC
[2/4] flink git commit: [FLINK-9292] [core] Remove TypeInfoParser
(part 1)
[FLINK-9292] [core] Remove TypeInfoParser (part 1)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8fa8d02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8fa8d02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8fa8d02
Branch: refs/heads/master
Commit: c8fa8d025684c2225824c54a7285bbfdec7cfddc
Parents: 6ae81d4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri May 4 17:15:51 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 18:48:16 2018 +0200
----------------------------------------------------------------------
.../connectors/kafka/Kafka010ITCase.java | 6 +-
.../connectors/kafka/Kafka011ITCase.java | 6 +-
.../connectors/kafka/KafkaConsumerTestBase.java | 20 +-
.../kafka/KafkaShortRetentionTestBase.java | 5 +-
.../kafka/testutils/DataGenerators.java | 3 +-
.../java/typeutils/WritableInfoParserTest.java | 90 -----
.../api/java/typeutils/TypeInfoParser.java | 23 +-
.../java/typeutils/PojoTypeExtractionTest.java | 99 +++---
.../api/java/typeutils/TypeInfoParserTest.java | 340 -------------------
.../MultidimensionalArraySerializerTest.java | 7 +-
.../java/operators/SingleInputUdfOperator.java | 48 ---
.../api/java/operators/TwoInputUdfOperator.java | 48 ---
.../base/CoGroupOperatorCollectionTest.java | 14 +-
.../operators/base/GroupReduceOperatorTest.java | 85 ++---
.../operators/base/ReduceOperatorTest.java | 116 +++----
.../flink/api/java/TypeExtractionTest.java | 6 +-
.../java/type/lambdas/LambdaExtractionTest.java | 35 +-
.../operators/sort/LargeRecordHandlerTest.java | 14 +-
.../api/datastream/IterativeStream.java | 17 +-
.../datastream/SingleOutputStreamOperator.java | 45 ---
.../windowing/EvictingWindowOperatorTest.java | 47 +--
.../windowing/WindowOperatorMigrationTest.java | 52 +--
.../operators/windowing/WindowOperatorTest.java | 158 +++------
.../flink/test/manual/MassiveStringSorting.java | 4 +-
.../test/manual/MassiveStringValueSorting.java | 4 +-
.../flink/test/operators/OuterJoinITCase.java | 5 +-
.../flink/test/operators/TypeHintITCase.java | 3 +-
.../test/streaming/runtime/IterateITCase.java | 3 +-
28 files changed, 322 insertions(+), 981 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index a038c8e..0855e92 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -21,9 +21,9 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -204,7 +204,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
}
});
- final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+ final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(Types.LONG, env.getConfig());
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new FlinkKafkaPartitioner<Long>() {
private static final long serialVersionUID = -6730989584364230617L;
@@ -316,7 +316,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
long cnt = 0;
public LimitedLongDeserializer() {
- this.ti = TypeInfoParser.parse("Long");
+ this.ti = Types.LONG;
this.ser = ti.createSerializer(new ExecutionConfig());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
index f48f87a..8692b5a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
@@ -21,9 +21,9 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -213,7 +213,7 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
}
});
- final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
+ final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(Types.LONG, env.getConfig());
FlinkKafkaProducer011<Long> prod = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, Optional.of(new FlinkKafkaPartitioner<Long>() {
private static final long serialVersionUID = -6730989584364230617L;
@@ -327,7 +327,7 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
long cnt = 0;
public LimitedLongDeserializer() {
- this.ti = TypeInfoParser.parse("Long");
+ this.ti = Types.LONG;
this.ser = ti.createSerializer(new ExecutionConfig());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 6ed9143..7d88f0d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -34,7 +34,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
@@ -722,7 +721,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
env.getConfig().disableSysoutLogging();
- TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
+ TypeInformation<Tuple2<Long, String>> longStringType =
+ TypeInformation.of(new TypeHint<Tuple2<Long, String>>(){});
TypeInformationSerializationSchema<Tuple2<Long, String>> sourceSchema =
new TypeInformationSerializationSchema<>(longStringType, env.getConfig());
@@ -1233,7 +1233,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
createTestTopic(topic, parallelism, 1);
- final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long, byte[]>");
+ final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo =
+ TypeInformation.of(new TypeHint<Tuple2<Long, byte[]>>(){});
final TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema =
new TypeInformationSerializationSchema<>(longBytesInfo, new ExecutionConfig());
@@ -1581,7 +1582,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
env1.getConfig().disableSysoutLogging();
env1.disableOperatorChaining(); // let the source read everything into the network buffers
- TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig());
+ TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), env1.getConfig());
+
DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps));
fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() {
@Override
@@ -1686,7 +1689,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
final int finalCount;
int count = 0;
- TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+ TypeInformation<Tuple2<Integer, Integer>> ti = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){});
TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
public FixedNumberDeserializationSchema(int finalCount) {
@@ -1734,7 +1737,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
final int finalCount = finalCountTmp;
- final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType = TypeInfoParser.parse("Tuple2<Integer, Integer>");
+ final TypeInformation<Tuple2<Integer, Integer>> intIntTupleType =
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){});
final TypeInformationSerializationSchema<Tuple2<Integer, Integer>> deser =
new TypeInformationSerializationSchema<>(intIntTupleType, env.getConfig());
@@ -2234,7 +2238,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
private final TypeSerializer<Tuple2<Integer, Integer>> ts;
public Tuple2WithTopicSchema(ExecutionConfig ec) {
- ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+ ts = TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}).createSerializer(ec);
}
@Override
@@ -2251,7 +2255,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
@Override
public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
- return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+ return TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, String>>(){});
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 15d972f..f488325 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -218,13 +218,12 @@ public class KafkaShortRetentionTestBase implements Serializable {
@Override
public TypeInformation<String> getProducedType() {
- return TypeInfoParser.parse("String");
+ return Types.STRING;
}
}
/**
* Ensure that the consumer is properly failing if "auto.offset.reset" is set to "none".
- * @throws Exception
*/
public void runFailOnAutoOffsetResetNone() throws Exception {
final String topic = "auto-offset-reset-none-test";
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index e432a65..8c0d766 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -204,7 +203,7 @@ public class DataGenerators {
private static class MockStreamTransformation extends StreamTransformation<String> {
public MockStreamTransformation() {
- super("MockTransform", TypeInfoParser.<String>parse("String"), 1);
+ super("MockTransform", BasicTypeInfo.STRING_TYPE_INFO, 1);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
deleted file mode 100644
index 7262bb7..0000000
--- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/typeutils/WritableInfoParserTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import org.apache.hadoop.io.Writable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Tests for the type information parsing of {@link Writable}.
- */
-public class WritableInfoParserTest {
-
- @Test
- public void testWritableType() {
- TypeInformation<?> ti = TypeInfoParser.parse(
- "Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>");
-
- Assert.assertTrue(ti instanceof WritableTypeInfo<?>);
- Assert.assertEquals(MyWritable.class, ((WritableTypeInfo<?>) ti).getTypeClass());
- }
-
- @Test
- public void testPojoWithWritableType() {
- TypeInformation<?> ti = TypeInfoParser.parse(
- "org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyPojo<"
- + "basic=Integer,"
- + "tuple=Tuple2<String, Integer>,"
- + "hadoopCitizen=Writable<org.apache.flink.api.java.typeutils.WritableInfoParserTest$MyWritable>,"
- + "array=String[]"
- + ">");
- Assert.assertTrue(ti instanceof PojoTypeInfo);
- PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
- Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
- Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
- Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
- Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
- }
- // ------------------------------------------------------------------------
- // Test types
- // ------------------------------------------------------------------------
-
- private static class MyWritable implements Writable {
-
- @Override
- public void write(DataOutput out) throws IOException {}
-
- @Override
- public void readFields(DataInput in) throws IOException {}
- }
-
- /**
- * Test Pojo containing a {@link Writable}.
- */
- public static class MyPojo {
- public Integer basic;
- public Tuple2<String, Integer> tuple;
- public MyWritable hadoopCitizen;
- public String[] array;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
index 12a9ae0..bb74e70 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.typeutils;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Value;
+ import org.apache.flink.annotation.Public;
+ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+ import org.apache.flink.api.common.typeinfo.TypeInformation;
+ import org.apache.flink.types.Value;
+
+ import java.lang.reflect.Field;
+ import java.util.ArrayList;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
/**
* @deprecated Use {@link org.apache.flink.api.common.typeinfo.Types} instead.
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
index ba09786..54127be 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java
@@ -18,35 +18,52 @@
package org.apache.flink.api.java.typeutils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyValue;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
import org.junit.Assert;
import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+
/**
- * Pojo Type tests
+ * Pojo Type tests.
*
- * A Pojo is a bean-style class with getters, setters and empty ctor
- * OR a class with all fields public (or for every private field, there has to be a public getter/setter)
- * everything else is a generic type (that can't be used for field selection)
+ * <p>Pojo is a bean-style class with getters, setters and empty ctor
+ * OR a class with all fields public (or for every private field, there has to be a public getter/setter)
+ * everything else is a generic type (that can't be used for field selection)
*/
public class PojoTypeExtractionTest {
+ /**
+ * Simple test type that implements the {@link Value} interface.
+ */
+ public static class MyValue implements Value {
+ private static final long serialVersionUID = 8607223484689147046L;
+
+ @Override
+ public void write(DataOutputView out) throws IOException {}
+
+ @Override
+ public void read(DataInputView in) throws IOException {}
+ }
+
public static class HasDuplicateField extends WC {
@SuppressWarnings("unused")
private int count; // duplicate
@@ -568,13 +585,13 @@ public class PojoTypeExtractionTest {
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testGenericPojoTypeInference1() {
- MapFunction<?, ?> function = new MyMapper<String>();
+ MyMapper<String> function = new MyMapper<>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
- TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=Long,field2=String>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+ function,
+ TypeInformation.of(new TypeHint<PojoWithGenerics<Long, String>>(){}));
Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
@@ -611,10 +628,12 @@ public class PojoTypeExtractionTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testGenericPojoTypeInference2() {
- MapFunction<?, ?> function = new MyMapper2<Boolean, Character>();
+ MyMapper2<Boolean, Character> function = new MyMapper2<>();
+
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+ function,
+ TypeInformation.of(new TypeHint<Tuple2<Character,Boolean>>(){}));
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
- TypeInfoParser.parse("Tuple2<Character,Boolean>"));
Assert.assertTrue(ti instanceof PojoTypeInfo<?>);
PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
for(int i = 0; i < pti.getArity(); i++) {
@@ -646,10 +665,11 @@ public class PojoTypeExtractionTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testGenericPojoTypeInference3() {
- MapFunction<?, ?> function = new MyMapper3<Boolean, Character>();
+ MyMapper3<Boolean, Character> function = new MyMapper3<>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
- TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoTuple<extraField=char,f0=boolean,f1=boolean,f2=long>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+ function,
+ TypeInformation.of(new TypeHint<PojoTuple<Character, Boolean, Boolean>>(){}));
Assert.assertTrue(ti instanceof TupleTypeInfo<?>);
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -672,10 +692,11 @@ public class PojoTypeExtractionTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testGenericPojoTypeInference4() {
- MapFunction<?, ?> function = new MyMapper4<Byte>();
+ MyMapper4<Byte> function = new MyMapper4<>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
- TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields1<field=Tuple2<byte,byte>>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+ function,
+ TypeInformation.of(new TypeHint<PojoWithParameterizedFields1<Byte>>(){}));
Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
}
@@ -694,12 +715,11 @@ public class PojoTypeExtractionTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testGenericPojoTypeInference5() {
- MapFunction<?, ?> function = new MyMapper5<Byte>();
+ MyMapper5<Byte> function = new MyMapper5<>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
- TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields2<"
- + "field=org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithGenerics<key=int,field1=byte,field2=byte>"
- + ">"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+ function,
+ TypeInformation.of(new TypeHint<PojoWithParameterizedFields2<Byte>>(){}));
Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, ti);
}
@@ -718,12 +738,11 @@ public class PojoTypeExtractionTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testGenericPojoTypeInference6() {
- MapFunction<?, ?> function = new MyMapper6<Integer>();
+ MyMapper6<Integer> function = new MyMapper6<>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
- TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields3<"
- + "field=int[]"
- + ">"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+ function,
+ TypeInformation.of(new TypeHint<PojoWithParameterizedFields3<Integer>>(){}));
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
}
@@ -742,12 +761,12 @@ public class PojoTypeExtractionTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testGenericPojoTypeInference7() {
- MapFunction<?, ?> function = new MyMapper7<Integer>();
+ MyMapper7<Integer> function = new MyMapper7<>();
+
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+ function,
+ TypeInformation.of(new TypeHint<PojoWithParameterizedFields4<Integer>>(){}));
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation)
- TypeInfoParser.parse("org.apache.flink.api.java.typeutils.PojoTypeExtractionTest$PojoWithParameterizedFields4<"
- + "field=Tuple1<int>[]"
- + ">"));
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
deleted file mode 100644
index 4570f50..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.junit.Assert;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.CharValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.StringValue;
-
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class TypeInfoParserTest {
-
- @Test
- public void testBasicTypes() {
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("Integer"));
- Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("Double"));
- Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("Byte"));
- Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("Float"));
- Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("Short"));
- Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("Long"));
- Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("Character"));
- Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInfoParser.parse("String"));
- Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("Boolean"));
- Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("Void"));
- Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, TypeInfoParser.parse("Date"));
- Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, TypeInfoParser.parse("BigInteger"));
- Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, TypeInfoParser.parse("BigDecimal"));
-
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("java.lang.Integer"));
- Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("java.lang.Double"));
- Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("java.lang.Byte"));
- Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("java.lang.Float"));
- Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("java.lang.Short"));
- Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("java.lang.Long"));
- Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("java.lang.Character"));
- Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInfoParser.parse("java.lang.String"));
- Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("java.lang.Boolean"));
- Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("java.lang.Void"));
- Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, TypeInfoParser.parse("java.util.Date"));
- Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, TypeInfoParser.parse("java.math.BigInteger"));
- Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, TypeInfoParser.parse("java.math.BigDecimal"));
-
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("int"));
- Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("double"));
- Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, TypeInfoParser.parse("byte"));
- Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, TypeInfoParser.parse("float"));
- Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, TypeInfoParser.parse("short"));
- Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, TypeInfoParser.parse("long"));
- Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, TypeInfoParser.parse("char"));
- Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("boolean"));
- Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("void"));
- }
-
- @Test
- public void testValueTypes() {
- helperValueType("StringValue", StringValue.class);
- helperValueType("IntValue", IntValue.class);
- helperValueType("ByteValue", ByteValue.class);
- helperValueType("ShortValue", ShortValue.class);
- helperValueType("CharValue", CharValue.class);
- helperValueType("DoubleValue", DoubleValue.class);
- helperValueType("FloatValue", FloatValue.class);
- helperValueType("LongValue", LongValue.class);
- helperValueType("BooleanValue", BooleanValue.class);
- helperValueType("ListValue", ListValue.class);
- helperValueType("MapValue", MapValue.class);
- helperValueType("NullValue", NullValue.class);
- }
-
- private static void helperValueType(String str, Class<?> clazz) {
- TypeInformation<?> ti = TypeInfoParser.parse(str);
- Assert.assertTrue(ti instanceof ValueTypeInfo);
- ValueTypeInfo<?> vti = (ValueTypeInfo<?>) ti;
- Assert.assertEquals(clazz, vti.getTypeClass());
- }
-
-
- @Test
- @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes")
- public void testBasicArrays() {
- Assert.assertEquals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Integer[]"));
- Assert.assertEquals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, TypeInfoParser.parse("Double[]"));
- Assert.assertEquals(BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO, TypeInfoParser.parse("Byte[]"));
- Assert.assertEquals(BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Float[]"));
- Assert.assertEquals(BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO, TypeInfoParser.parse("Short[]"));
- Assert.assertEquals(BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, TypeInfoParser.parse("Long[]"));
- Assert.assertEquals(BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO, TypeInfoParser.parse("Character[]"));
- Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, TypeInfoParser.parse("String[]"));
- Assert.assertEquals(BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO, TypeInfoParser.parse("Boolean[]"));
-
- Assert.assertEquals(BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Integer[]"));
- Assert.assertEquals(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Double[]"));
- Assert.assertEquals(BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Byte[]"));
- Assert.assertEquals(BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Float[]"));
- Assert.assertEquals(BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Short[]"));
- Assert.assertEquals(BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Long[]"));
- Assert.assertEquals(BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Character[]"));
- Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.String[]"));
- Assert.assertEquals(BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO, TypeInfoParser.parse("java.lang.Boolean[]"));
-
- Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("int[]"));
- Assert.assertEquals(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("double[]"));
- Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("byte[]"));
- Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("float[]"));
- Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("short[]"));
- Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("long[]"));
- Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("char[]"));
- Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, TypeInfoParser.parse("boolean[]"));
- }
-
- @Test
- public void testTuples() {
- TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<Integer, Long>");
- Assert.assertEquals(2, ti.getArity());
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(0));
- Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(1));
-
- ti = TypeInfoParser.parse("Tuple0");
- Assert.assertEquals(0, ti.getArity());
- Assert.assertEquals("Java Tuple0", ti.toString());
-
- ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple0");
- Assert.assertEquals(0, ti.getArity());
- Assert.assertEquals("Java Tuple0", ti.toString());
-
- ti = TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>");
- Assert.assertEquals("Java Tuple3<Java Tuple1<String>, Java Tuple1<Integer>, Java Tuple2<Long, Long>>", ti.toString());
- }
-
- @Test
- public void testGenericType() {
- TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class");
- Assert.assertTrue(ti instanceof GenericTypeInfo);
- Assert.assertEquals(Class.class, ((GenericTypeInfo<?>) ti).getTypeClass());
- }
-
- public static class MyValue implements Value {
- private static final long serialVersionUID = 8607223484689147046L;
-
- @Override
- public void write(DataOutputView out) throws IOException {}
-
- @Override
- public void read(DataInputView in) throws IOException {}
- }
-
- public static class MyPojo {
- public Integer basic;
- public Tuple2<String, Integer> tuple;
- public Value valueType;
- public String[] array;
- }
-
- @Test
- public void testPojoType() {
- TypeInformation<?> ti = TypeInfoParser.parse(
- "org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<"
- + "basic=Integer,"
- + "tuple=Tuple2<String, Integer>,"
- + "valueType=org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyValue,"
- + "array=String[]"
- + ">");
- Assert.assertTrue(ti instanceof PojoTypeInfo);
- PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
- Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
- Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
- Assert.assertEquals("tuple", pti.getPojoFieldAt(2).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof TupleTypeInfo);
- Assert.assertEquals("valueType", pti.getPojoFieldAt(3).getField().getName());
-
-// this currently fails but should not
-// Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof ValueTypeInfo);
- }
-
- @Test
- public void testPojoType2() {
- TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<String,Tuple2<Integer,org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>>>");
- Assert.assertTrue(ti instanceof TupleTypeInfo);
- TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
- Assert.assertTrue(tti.getTypeAt(0) instanceof BasicTypeInfo);
- Assert.assertTrue(tti.getTypeAt(1) instanceof TupleTypeInfo);
- TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>)(Object)tti.getTypeAt(1);
- Assert.assertTrue(tti2.getTypeAt(0) instanceof BasicTypeInfo);
- Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo);
- PojoTypeInfo<?> pti = (PojoTypeInfo<?>) tti2.getTypeAt(1);
- Assert.assertEquals("basic", pti.getPojoFieldAt(0).getField().getName());
- Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicTypeInfo);
- }
-
- @Test
- public void testObjectArrays() {
- TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class[]");
-
- Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
- Assert.assertEquals(Class.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
-
- TypeInformation<?> ti2 = TypeInfoParser.parse("Tuple2<Integer,Double>[]");
-
- Assert.assertTrue(ti2 instanceof ObjectArrayTypeInfo<?, ?>);
- Assert.assertTrue(((ObjectArrayTypeInfo<?, ?>) ti2).getComponentInfo() instanceof TupleTypeInfo);
-
- TypeInformation<?> ti3 = TypeInfoParser.parse("Tuple2<Integer[],Double>[]");
- Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString());
- }
-
- @Test
- public void testLargeMixedTuple() {
- TypeInformation<?> ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple4<Double,java.lang.Class[],StringValue,Tuple1<int>>[]");
- Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<StringValue>, Java Tuple1<Integer>>>", ti.toString());
- }
-
- public static enum MyEnum {
- ONE, TWO, THREE
- }
-
- @Test
- public void testEnumType() {
- TypeInformation<?> ti = TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>");
- Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>", ti.toString());
-
- TypeInformation<?> ti2 = TypeInfoParser.parse("java.lang.Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>");
- Assert.assertEquals("EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>", ti2.toString());
- }
-
- @Test
- public void testException() {
- try {
- TypeInfoParser.parse("THIS_CLASS_DOES_NOT_EXIST");
- Assert.fail("exception expected");
- } catch (IllegalArgumentException e) {
- // right
- }
-
- try {
- TypeInfoParser.parse("Tuple2<Integer>");
- Assert.fail("exception expected");
- } catch (IllegalArgumentException e) {
- // right
- }
-
- try {
- TypeInfoParser.parse("Tuple3<Integer,,>");
- Assert.fail("exception expected");
- } catch (IllegalArgumentException e) {
- // right
- }
-
- try {
- TypeInfoParser.parse("Tuple1<Integer,Double>");
- Assert.fail("exception expected");
- } catch (IllegalArgumentException e) {
- // right
- }
- }
-
- @Test
- public void testMultiDimensionalArray() {
- // tuple
- TypeInformation<?> ti = TypeInfoParser.parse("Tuple2<Integer, Double>[][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple2<Integer, Double>>>", ti.toString());
-
- // pojos
- ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo<basic=String>[][][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
- + "PojoType<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyPojo, fields = [basic: String]>"
- + ">>>", ti.toString());
-
- // basic types
- ti = TypeInfoParser.parse("Float[][][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<Float>>>", ti.toString());
- ti = TypeInfoParser.parse("String[][][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<String>>>", ti.toString());
- ti = TypeInfoParser.parse("Date[][][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<Date>>>", ti.toString());
-
- // primitive types
- ti = TypeInfoParser.parse("int[][][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<int[]>>", ti.toString());
- ti = TypeInfoParser.parse("boolean[][][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<boolean[]>>", ti.toString());
-
- // value types
- ti = TypeInfoParser.parse("IntValue[][][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<ValueType<IntValue>>>>", ti.toString());
-
- // value types
- ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyValue[][][]");
-
- // this fails because value types are parsed in a wrong way
-// Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
-// + "ValueType<TypeInfoParserTest$MyValue>"
-// + ">>>", ti.toString());
-
-
-
- // enum types
- ti = TypeInfoParser.parse("Enum<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>[][][]");
- Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
- + "EnumTypeInfo<org.apache.flink.api.java.typeutils.TypeInfoParserTest$MyEnum>"
- + ">>>", ti.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
index 1c97816..904d1b3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/MultidimensionalArraySerializerTest.java
@@ -19,10 +19,11 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+
import org.junit.Test;
@@ -111,7 +112,9 @@ public class MultidimensionalArraySerializerTest {
MyGenericPojo<String>[][] array = (MyGenericPojo<String>[][]) new MyGenericPojo[][]{
{ new MyGenericPojo<String>(new String[][]{{"a", "b"},{"c", "d"}}), null}
};
- TypeInformation ti = TypeInfoParser.parse("org.apache.flink.api.java.typeutils.runtime.MultidimensionalArraySerializerTest$MyGenericPojo<field=String[][]>[][]");
+
+ TypeInformation<MyGenericPojo<String>[][]> ti =
+ TypeInformation.of(new TypeHint<MyGenericPojo<String>[][]>(){});
SerializerTestInstance testInstance = new SerializerTestInstance(ti.createSerializer(new ExecutionConfig()), MyGenericPojo[][].class, -1, (Object) array);
testInstance.testAll();
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index 4ce44aa..e98a822 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.SemanticProperties;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import java.lang.annotation.Annotation;
@@ -261,52 +259,6 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
return returnType;
}
- /**
- * Adds a type information hint about the return type of this operator.
- *
- *
- * <p>Type hints are important in cases where the Java compiler
- * throws away generic type information necessary for efficient execution.
- *
- *
- * <p>This method takes a type information string that will be parsed. A type information string can contain the following
- * types:
- *
- * <ul>
- * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
- * <li>Basic type arrays such as <code>Integer[]</code>,
- * <code>String[]</code>, etc.
- * <li>Tuple types such as <code>Tuple1<TYPE0></code>,
- * <code>Tuple2<TYPE0, TYPE1></code>, etc.</li>
- * <li>Pojo types such as <code>org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1></code>, etc.</li>
- * <li>Generic types such as <code>java.lang.Class</code>, etc.
- * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
- * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
- * <li>Value types such as <code>DoubleValue</code>,
- * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
- * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], etc.</code></li>
- * <li>Writable types such as <code>Writable<org.my.CustomWritable></code></li>
- * <li>Enum types such as <code>Enum<org.my.CustomEnum></code></li>
- * </ul>
- *
- * <p>Example:
- * <code>"Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>"</code>
- *
- * @param typeInfoString
- * type information string to be parsed
- * @return This operator with a given return type hint.
- *
- * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead.
- */
- @Deprecated
- @PublicEvolving
- public O returns(String typeInfoString) {
- if (typeInfoString == null) {
- throw new IllegalArgumentException("Type information string must not be null.");
- }
- return returns(TypeInfoParser.<OUT>parse(typeInfoString));
- }
-
// --------------------------------------------------------------------------------------------
// Accessors
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index b78d17e..a7d9506 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.DualInputSemanticProperties;
@@ -30,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import java.lang.annotation.Annotation;
@@ -329,52 +327,6 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
return returnType;
}
- /**
- * Adds a type information hint about the return type of this operator.
- *
- *
- * <p>Type hints are important in cases where the Java compiler
- * throws away generic type information necessary for efficient execution.
- *
- *
- * <p>This method takes a type information string that will be parsed. A type information string can contain the following
- * types:
- *
- * <ul>
- * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
- * <li>Basic type arrays such as <code>Integer[]</code>,
- * <code>String[]</code>, etc.
- * <li>Tuple types such as <code>Tuple1<TYPE0></code>,
- * <code>Tuple2<TYPE0, TYPE1></code>, etc.</li>
- * <li>Pojo types such as <code>org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1></code>, etc.</li>
- * <li>Generic types such as <code>java.lang.Class</code>, etc.
- * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
- * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
- * <li>Value types such as <code>DoubleValue</code>,
- * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
- * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], etc.</code></li>
- * <li>Writable types such as <code>Writable<org.my.CustomWritable></code></li>
- * <li>Enum types such as <code>Enum<org.my.CustomEnum></code></li>
- * </ul>
- *
- * <p>Example:
- * <code>"Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>"</code>
- *
- * @param typeInfoString
- * type information string to be parsed
- * @return This operator with a given return type hint.
- *
- * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead.
- */
- @Deprecated
- @PublicEvolving
- public O returns(String typeInfoString) {
- if (typeInfoString == null) {
- throw new IllegalArgumentException("Type information string must not be null.");
- }
- return returns(TypeInfoParser.<OUT>parse(typeInfoString));
- }
-
// --------------------------------------------------------------------------------------------
// Accessors
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index e1e393b..79591cf 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -26,9 +26,10 @@ import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -183,14 +184,11 @@ public class CoGroupOperatorCollectionTest implements Serializable {
Tuple2<String, Integer>>> getCoGroupOperator(
RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> udf) {
- return new CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>,
- CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>>(
+ TypeInformation<Tuple2<String, Integer>> tuple2Info = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
+
+ return new CoGroupOperatorBase<>(
udf,
- new BinaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>(
- TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
- TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
- TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>")
- ),
+ new BinaryOperatorInformation<>(tuple2Info, tuple2Info, tuple2Info),
new int[]{0},
new int[]{0},
"coGroup on Collections"
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index d788efd..b51b52d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -20,16 +20,15 @@ package org.apache.flink.api.common.operators.base;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Collector;
@@ -41,7 +40,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Arrays.asList;
@@ -55,6 +53,9 @@ import static org.junit.Assert.fail;
@SuppressWarnings({"serial", "unchecked"})
public class GroupReduceOperatorTest implements java.io.Serializable {
+ private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE =
+ TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
+
@Test
public void testGroupReduceCollection() {
try {
@@ -79,18 +80,18 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
};
GroupReduceOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>,
- GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String,
- Integer>>> op = new GroupReduceOperatorBase<Tuple2<String, Integer>,
- Tuple2<String, Integer>, GroupReduceFunction<Tuple2<String, Integer>,
- Tuple2<String, Integer>>>(reducer, new UnaryOperatorInformation<Tuple2<String,
- Integer>, Tuple2<String, Integer>>(TypeInfoParser.<Tuple2<String,
- Integer>>parse("Tuple2<String, Integer>"), TypeInfoParser.<Tuple2<String,
- Integer>>parse("Tuple2<String, Integer>")), new int[]{0}, "TestReducer");
-
- List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String,
- Integer>>(asList(new Tuple2<String, Integer>("foo", 1), new Tuple2<String,
- Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
- Integer>("bar", 4)));
+ GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>> op =
+ new GroupReduceOperatorBase<>(
+ reducer,
+ new UnaryOperatorInformation<>(STRING_INT_TUPLE, STRING_INT_TUPLE),
+ new int[]{0},
+ "TestReducer");
+
+ List<Tuple2<String, Integer>> input = new ArrayList<>(asList(
+ new Tuple2<>("foo", 1),
+ new Tuple2<>("foo", 3),
+ new Tuple2<>("bar", 2),
+ new Tuple2<>("bar", 4)));
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableObjectReuse();
@@ -98,12 +99,12 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
executionConfig.enableObjectReuse();
List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, executionConfig);
- Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
- Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
+ Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<>(resultMutableSafe);
+ Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<>(resultRegular);
- Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
- Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
- Integer>("bar", 6)));
+ Set<Tuple2<String, Integer>> expectedResult = new HashSet<>(asList(
+ new Tuple2<>("foo", 4),
+ new Tuple2<>("bar", 6)));
assertEquals(expectedResult, resultSetMutableSafe);
assertEquals(expectedResult, resultSetRegular);
@@ -156,18 +157,18 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
};
GroupReduceOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>,
- GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String,
- Integer>>> op = new GroupReduceOperatorBase<Tuple2<String, Integer>,
- Tuple2<String, Integer>, GroupReduceFunction<Tuple2<String, Integer>,
- Tuple2<String, Integer>>>(reducer, new UnaryOperatorInformation<Tuple2<String,
- Integer>, Tuple2<String, Integer>>(TypeInfoParser.<Tuple2<String,
- Integer>>parse("Tuple2<String, Integer>"), TypeInfoParser.<Tuple2<String,
- Integer>>parse("Tuple2<String, Integer>")), new int[]{0}, "TestReducer");
-
- List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String,
- Integer>>(asList(new Tuple2<String, Integer>("foo", 1), new Tuple2<String,
- Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
- Integer>("bar", 4)));
+ GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>> op =
+ new GroupReduceOperatorBase<>(
+ reducer,
+ new UnaryOperatorInformation<>(STRING_INT_TUPLE, STRING_INT_TUPLE),
+ new int[]{0},
+ "TestReducer");
+
+ List<Tuple2<String, Integer>> input = new ArrayList<>(asList(
+ new Tuple2<>("foo", 1),
+ new Tuple2<>("foo", 3),
+ new Tuple2<>("bar", 2),
+ new Tuple2<>("bar", 4)));
final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
@@ -175,25 +176,25 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
executionConfig.disableObjectReuse();
List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input,
new RuntimeUDFContext(taskInfo, null, executionConfig,
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?, ?>>(),
+ new HashMap<>(),
+ new HashMap<>(),
new UnregisteredMetricsGroup()),
executionConfig);
executionConfig.enableObjectReuse();
List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
new RuntimeUDFContext(taskInfo, null, executionConfig,
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?, ?>>(),
+ new HashMap<>(),
+ new HashMap<>(),
new UnregisteredMetricsGroup()),
executionConfig);
- Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
- Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
+ Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<>(resultMutableSafe);
+ Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<>(resultRegular);
- Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
- Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
- Integer>("bar", 6)));
+ Set<Tuple2<String, Integer>> expectedResult = new HashSet<>(asList(
+ new Tuple2<>("foo", 4),
+ new Tuple2<>("bar", 6)));
assertEquals(expectedResult, resultSetMutableSafe);
assertEquals(expectedResult, resultSetRegular);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 396b78f..58a7b18 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -20,27 +20,27 @@ package org.apache.flink.api.common.operators.base;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Arrays.asList;
@@ -52,35 +52,29 @@ import static org.junit.Assert.fail;
* Tests for {@link ReduceOperator}.
*/
@SuppressWarnings({"serial", "unchecked"})
-public class ReduceOperatorTest implements java.io.Serializable {
+public class ReduceOperatorTest extends TestLogger implements Serializable{
+
+ private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE =
+ TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
@Test
public void testReduceCollection() {
try {
- final ReduceFunction<Tuple2<String, Integer>> reducer = new
- ReduceFunction<Tuple2<String, Integer>>() {
-
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
- Tuple2<String, Integer> value2) throws
- Exception {
- return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
- }
- };
-
- ReduceOperatorBase<Tuple2<String, Integer>, ReduceFunction<Tuple2<String,
- Integer>>> op = new ReduceOperatorBase<Tuple2<String, Integer>,
- ReduceFunction<Tuple2<String, Integer>>>(reducer,
- new UnaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String,
- Integer>>(TypeInfoParser.<Tuple2<String,
- Integer>>parse("Tuple2<String, Integer>"),
- TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, " +
- "Integer>")), new int[]{0}, "TestReducer");
-
- List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String,
- Integer>>(asList(new Tuple2<String, Integer>("foo", 1), new Tuple2<String,
- Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
- Integer>("bar", 4)));
+ final ReduceFunction<Tuple2<String, Integer>> reducer =
+ (value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1);
+
+ ReduceOperatorBase<Tuple2<String, Integer>, ReduceFunction<Tuple2<String, Integer>>> op =
+ new ReduceOperatorBase<>(
+ reducer,
+ new UnaryOperatorInformation<>(STRING_INT_TUPLE, STRING_INT_TUPLE),
+ new int[]{0},
+ "TestReducer");
+
+ List<Tuple2<String, Integer>> input = new ArrayList<>(asList(
+ new Tuple2<>("foo", 1),
+ new Tuple2<>("foo", 3),
+ new Tuple2<>("bar", 2),
+ new Tuple2<>("bar", 4)));
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.disableObjectReuse();
@@ -88,12 +82,12 @@ public class ReduceOperatorTest implements java.io.Serializable {
executionConfig.enableObjectReuse();
List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, executionConfig);
- Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
- Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
+ Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<>(resultMutableSafe);
+ Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<>(resultRegular);
- Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
- Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
- Integer>("bar", 6)));
+ Set<Tuple2<String, Integer>> expectedResult = new HashSet<>(asList(
+ new Tuple2<>("foo", 4),
+ new Tuple2<>("bar", 6)));
assertEquals(expectedResult, resultSetMutableSafe);
assertEquals(expectedResult, resultSetRegular);
@@ -111,13 +105,14 @@ public class ReduceOperatorTest implements java.io.Serializable {
final AtomicBoolean opened = new AtomicBoolean();
final AtomicBoolean closed = new AtomicBoolean();
- final ReduceFunction<Tuple2<String, Integer>> reducer = new
- RichReduceFunction<Tuple2<String, Integer>>() {
+ final ReduceFunction<Tuple2<String, Integer>> reducer = new RichReduceFunction<Tuple2<String, Integer>>() {
+
@Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
- Tuple2<String, Integer> value2) throws
- Exception {
- return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
+ public Tuple2<String, Integer> reduce(
+ Tuple2<String, Integer> value1,
+ Tuple2<String, Integer> value2) throws Exception {
+
+ return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
@Override
@@ -135,19 +130,18 @@ public class ReduceOperatorTest implements java.io.Serializable {
}
};
- ReduceOperatorBase<Tuple2<String, Integer>, ReduceFunction<Tuple2<String,
- Integer>>> op = new ReduceOperatorBase<Tuple2<String, Integer>,
- ReduceFunction<Tuple2<String, Integer>>>(reducer,
- new UnaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String,
- Integer>>(TypeInfoParser.<Tuple2<String,
- Integer>>parse("Tuple2<String, Integer>"),
- TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, " +
- "Integer>")), new int[]{0}, "TestReducer");
+ ReduceOperatorBase<Tuple2<String, Integer>, ReduceFunction<Tuple2<String, Integer>>> op =
+ new ReduceOperatorBase<>(
+ reducer,
+ new UnaryOperatorInformation<>(STRING_INT_TUPLE, STRING_INT_TUPLE),
+ new int[]{0},
+ "TestReducer");
- List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String,
- Integer>>(asList(new Tuple2<String, Integer>("foo", 1), new Tuple2<String,
- Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
- Integer>("bar", 4)));
+ List<Tuple2<String, Integer>> input = new ArrayList<>(asList(
+ new Tuple2<>("foo", 1),
+ new Tuple2<>("foo", 3),
+ new Tuple2<>("bar", 2),
+ new Tuple2<>("bar", 4)));
final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
@@ -156,25 +150,25 @@ public class ReduceOperatorTest implements java.io.Serializable {
executionConfig.disableObjectReuse();
List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input,
new RuntimeUDFContext(taskInfo, null, executionConfig,
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?, ?>>(),
+ new HashMap<>(),
+ new HashMap<>(),
new UnregisteredMetricsGroup()),
executionConfig);
executionConfig.enableObjectReuse();
List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
new RuntimeUDFContext(taskInfo, null, executionConfig,
- new HashMap<String, Future<Path>>(),
- new HashMap<String, Accumulator<?, ?>>(),
+ new HashMap<>(),
+ new HashMap<>(),
new UnregisteredMetricsGroup()),
executionConfig);
- Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
- Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
+ Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<>(resultMutableSafe);
+ Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<>(resultRegular);
- Set<Tuple2<String, Integer>> expectedResult = new HashSet<Tuple2<String,
- Integer>>(asList(new Tuple2<String, Integer>("foo", 4), new Tuple2<String,
- Integer>("bar", 6)));
+ Set<Tuple2<String, Integer>> expectedResult = new HashSet<>(asList(
+ new Tuple2<>("foo", 4),
+ new Tuple2<>("bar", 6)));
assertEquals(expectedResult, resultSetMutableSafe);
assertEquals(expectedResult, resultSetRegular);
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
index 22620c2..c8a9890 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TypeExtractionTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.common.typeinfo.Types;
import org.junit.Test;
@@ -47,9 +47,9 @@ public class TypeExtractionTest {
TypeInformation<?> info = ExecutionEnvironment.getExecutionEnvironment()
.fromElements("arbitrary", "data")
- .map(function).returns("String").getResultType();
+ .map(function).returns(Types.STRING).getResultType();
- assertEquals(TypeInfoParser.parse("String"), info);
+ assertEquals(Types.STRING, info);
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/c8fa8d02/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
index d862f47..de1f395 100644
--- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java
@@ -30,7 +30,9 @@ import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,7 +40,6 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.junit.Assert;
import org.junit.Test;
@@ -57,6 +58,12 @@ import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public class LambdaExtractionTest {
+ private static final TypeInformation<Tuple2<Tuple1<Integer>, Boolean>> NESTED_TUPLE_BOOLEAN_TYPE =
+ new TypeHint<Tuple2<Tuple1<Integer>, Boolean>>(){}.getTypeInfo();
+
+ private static final TypeInformation<Tuple2<Tuple1<Integer>, Double>> NESTED_TUPLE_DOUBLE_TYPE =
+ new TypeHint<Tuple2<Tuple1<Integer>, Double>>(){}.getTypeInfo();
+
@Test
public void testIdentifyLambdas() {
try {
@@ -126,7 +133,7 @@ public class LambdaExtractionTest {
@Test
public void testLambdaWithMemberVariable() {
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), TypeInfoParser.parse("Integer"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(new MyClass().getMapFunction(), Types.INT);
Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
}
@@ -138,7 +145,7 @@ public class LambdaExtractionTest {
MapFunction<Integer, String> f = (i) -> s + k + j;
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Integer"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, Types.INT);
Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO);
}
@@ -146,7 +153,7 @@ public class LambdaExtractionTest {
public void testMapLambda() {
MapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -159,7 +166,7 @@ public class LambdaExtractionTest {
public void testFlatMapLambda() {
FlatMapFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
- TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -172,7 +179,7 @@ public class LambdaExtractionTest {
public void testMapPartitionLambda() {
MapPartitionFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
- TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ TypeInformation<?> ti = TypeExtractor.getMapPartitionReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -185,7 +192,7 @@ public class LambdaExtractionTest {
public void testGroupReduceLambda() {
GroupReduceFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i, o) -> {};
- TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -198,7 +205,7 @@ public class LambdaExtractionTest {
public void testFlatJoinLambda() {
FlatJoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
- TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+ TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -211,7 +218,7 @@ public class LambdaExtractionTest {
public void testJoinLambda() {
JoinFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
- TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+ TypeInformation<?> ti = TypeExtractor.getJoinReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -224,7 +231,7 @@ public class LambdaExtractionTest {
public void testCoGroupLambda() {
CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
- TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+ TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -237,7 +244,7 @@ public class LambdaExtractionTest {
public void testCrossLambda() {
CrossFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2) -> null;
- TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
+ TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(f, NESTED_TUPLE_BOOLEAN_TYPE, NESTED_TUPLE_DOUBLE_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -250,7 +257,7 @@ public class LambdaExtractionTest {
public void testKeySelectorLambda() {
KeySelector<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, String>> f = (i) -> null;
- TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"));
+ TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(f, NESTED_TUPLE_BOOLEAN_TYPE);
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -262,8 +269,8 @@ public class LambdaExtractionTest {
@SuppressWarnings("rawtypes")
@Test
public void testLambdaTypeErasure() {
- MapFunction<Tuple1, Tuple1> f = (i) -> null;
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeInfoParser.parse("Tuple1<String>"), null, true);
+ MapFunction<Tuple1<Integer>, Tuple1> f = (i) -> null;
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, new TypeHint<Tuple1<Integer>>(){}.getTypeInfo(), null, true);
Assert.assertTrue(ti instanceof MissingTypeInfo);
}