You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/17 13:14:01 UTC
[1/4] flink git commit: [FLINK-9299] [docs] Fix errors in
ProcessWindowFunction documentation Java examples
Repository: flink
Updated Branches:
refs/heads/master c6fa05e56 -> 3df780902
[FLINK-9299] [docs] Fix errors in ProcessWindowFunction documentation Java examples
This closes #6001
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22531a87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22531a87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22531a87
Branch: refs/heads/master
Commit: 22531a87158a725804734b27e091ff6597686dd6
Parents: 9ddb978
Author: yanghua <ya...@gmail.com>
Authored: Sun May 13 16:27:11 2018 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 15:12:50 2018 +0200
----------------------------------------------------------------------
docs/dev/stream/operators/windows.md | 28 ++++++++++++++--------------
1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/22531a87/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 649bfe8..ad2b516 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -504,7 +504,7 @@ private static class AverageAggregate
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
- return accumulator.f0 / accumulator.f1;
+ return ((double) accumulator.f0) / accumulator.f1;
}
@Override
@@ -730,7 +730,7 @@ input
/* ... */
-public class MyProcessWindowFunction implements ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
+public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
void process(String key, Context context, Iterable<Tuple<String, Long>> input, Collector<String> out) {
long count = 0;
@@ -778,7 +778,7 @@ The example shows a `ProcessWindowFunction` that counts the elements in a window
A `ProcessWindowFunction` can be combined with either a `ReduceFunction`, an `AggregateFunction`, or a `FoldFunction` to
incrementally aggregate elements as they arrive in the window.
When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated result.
-This allows to incrementally compute windows while having access to the
+This allows it to incrementally compute windows while having access to the
additional window meta information of the `ProcessWindowFunction`.
<span class="label label-info">Note</span> You can also the legacy `WindowFunction` instead of
@@ -797,7 +797,7 @@ DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
- .timeWindow(<window assigner>)
+ .timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
@@ -810,7 +810,7 @@ private static class MyReduceFunction implements ReduceFunction<SensorReading> {
}
private static class MyProcessWindowFunction
- implements ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
+ extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
@@ -830,7 +830,7 @@ val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
- .timeWindow(<window assigner>)
+ .timeWindow(<duration>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
@@ -856,11 +856,11 @@ the average.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-DataStream<Tuple2<String, Long> input = ...;
+DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
- .timeWindow(<window assigner>)
+ .timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
@@ -883,7 +883,7 @@ private static class AverageAggregate
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
- return accumulator.f0 / accumulator.f1;
+ return ((double) accumulator.f0) / accumulator.f1;
}
@Override
@@ -893,7 +893,7 @@ private static class AverageAggregate
}
private static class MyProcessWindowFunction
- implements ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
+ extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
@@ -913,7 +913,7 @@ val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
- .timeWindow(<window assigner>)
+ .timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
// Function definitions
@@ -959,7 +959,7 @@ DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
- .timeWindow(<window assigner>)
+ .timeWindow(<duration>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
// Function definitions
@@ -975,7 +975,7 @@ private static class MyFoldFunction
}
private static class MyProcessWindowFunction
- implements ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
+ extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void process(String key,
Context context,
@@ -995,7 +995,7 @@ val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
- .timeWindow(<window assigner>)
+ .timeWindow(<duration>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
[3/4] flink git commit: [FLINK-9292] [core] Remove TypeInfoParser
(part 2)
Posted by se...@apache.org.
[FLINK-9292] [core] Remove TypeInfoParser (part 2)
This changes #5970
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ddb978b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ddb978b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ddb978b
Branch: refs/heads/master
Commit: 9ddb978b04fb602609675936df3d0c6ff9b8519b
Parents: c6fa05e
Author: yanghua <ya...@gmail.com>
Authored: Wed May 9 17:13:06 2018 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 15:12:50 2018 +0200
----------------------------------------------------------------------
.../connectors/kafka/KafkaProducerTestBase.java | 4 +-
flink-core/pom.xml | 1 +
.../api/java/typeutils/TypeInfoParser.java | 417 -------------------
.../api/java/typeutils/TypeExtractorTest.java | 105 +++--
.../api/java/sca/UdfAnalyzerExamplesTest.java | 80 ++--
.../flink/api/java/sca/UdfAnalyzerTest.java | 241 +++++------
.../graph/asm/dataset/ChecksumHashCodeTest.java | 5 +-
.../flink/graph/asm/dataset/CollectTest.java | 5 +-
.../flink/graph/asm/dataset/CountTest.java | 5 +-
9 files changed, 226 insertions(+), 637 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 5023a7e..629497e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.state.CheckpointListener;
@@ -116,7 +116,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
expectedTopicsToNumPartitions.put(defaultTopic, defaultTopicPartitions);
expectedTopicsToNumPartitions.put(dynamicTopic, dynamicTopicPartitions);
- TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
+ TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInformation.of(new TypeHint<Tuple2<Long, String>>(){});
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index efd7b12..9826be0 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -177,6 +177,7 @@ under the License.
<!-- leaked constructor in TypeHint -->
<exclude>org.apache.flink.api.common.typeinfo.TypeHint</exclude>
+ <exclude>org.apache.flink.api.java.typeutils.TypeInfoParser</exclude>
</excludes>
</parameter>
</configuration>
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
deleted file mode 100644
index bb74e70..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ /dev/null
@@ -1,417 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
- import org.apache.flink.annotation.Public;
- import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.types.Value;
-
- import java.lang.reflect.Field;
- import java.util.ArrayList;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
-
-/**
- * @deprecated Use {@link org.apache.flink.api.common.typeinfo.Types} instead.
- */
-@Deprecated
-@Public
-public class TypeInfoParser {
- private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple";
- private static final String VALUE_PACKAGE = "org.apache.flink.types";
- private static final String WRITABLE_PACKAGE = "org.apache.hadoop.io";
-
- private static final Pattern tuplePattern = Pattern.compile("^(" + TUPLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?((Tuple[1-9][0-9]?)<|(Tuple0))");
- private static final Pattern writablePattern = Pattern.compile("^((" + WRITABLE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?Writable)<([^\\s,>]*)(,|>|$|\\[)");
- private static final Pattern enumPattern = Pattern.compile("^((java\\.lang\\.)?Enum)<([^\\s,>]*)(,|>|$|\\[)");
- private static final Pattern basicTypePattern = Pattern
- .compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean|Void))(,|>|$|\\[)");
- private static final Pattern basicTypeDatePattern = Pattern.compile("^((java\\.util\\.)?Date)(,|>|$|\\[)");
- private static final Pattern basicTypeBigIntPattern = Pattern.compile("^((java\\.math\\.)?BigInteger)(,|>|$|\\[)");
- private static final Pattern basicTypeBigDecPattern = Pattern.compile("^((java\\.math\\.)?BigDecimal)(,|>|$|\\[)");
- private static final Pattern primitiveTypePattern = Pattern.compile("^(int|byte|short|char|double|float|long|boolean|void)(,|>|$|\\[)");
- private static final Pattern valueTypePattern = Pattern.compile("^((" + VALUE_PACKAGE.replaceAll("\\.", "\\\\.")
- + "\\.)?(String|Int|Byte|Short|Char|Double|Float|Long|Boolean|List|Map|Null))Value(,|>|$|\\[)");
- private static final Pattern pojoGenericObjectPattern = Pattern.compile("^([^\\s,<>\\[]+)(<)?");
- private static final Pattern fieldPattern = Pattern.compile("^([^\\s,<>\\[]+)=");
-
- /**
- * Generates an instance of <code>TypeInformation</code> by parsing a type
- * information string. A type information string can contain the following
- * types:
- *
- * <ul>
- * <li>Basic types such as <code>Integer</code>, <code>String</code>, etc.
- * <li>Basic type arrays such as <code>Integer[]</code>,
- * <code>String[]</code>, etc.
- * <li>Tuple types such as <code>Tuple1<TYPE0></code>,
- * <code>Tuple2<TYPE0, TYPE1></code>, etc.</li>
- * <li>Pojo types such as <code>org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1></code>, etc.</li>
- * <li>Generic types such as <code>java.lang.Class</code>, etc.
- * <li>Custom type arrays such as <code>org.my.CustomClass[]</code>,
- * <code>org.my.CustomClass$StaticInnerClass[]</code>, etc.
- * <li>Value types such as <code>DoubleValue</code>,
- * <code>StringValue</code>, <code>IntegerValue</code>, etc.</li>
- * <li>Tuple array types such as <code>Tuple2<TYPE0,TYPE1>[], etc.</code></li>
- * <li>Writable types such as <code>Writable<org.my.CustomWritable></code></li>
- * <li>Enum types such as <code>Enum<org.my.CustomEnum></code></li>
- * </ul>
- *
- * Example:
- * <code>"Tuple2<String,Tuple2<Integer,org.my.MyJob$Pojo<word=String>>>"</code>
- *
- * @param infoString
- * type information string to be parsed
- * @return <code>TypeInformation</code> representation of the string
- */
- @SuppressWarnings("unchecked")
- public static <X> TypeInformation<X> parse(String infoString) {
- try {
- if (infoString == null) {
- throw new IllegalArgumentException("String is null.");
- }
- String clearedString = infoString.replaceAll("\\s", "");
- if (clearedString.length() == 0) {
- throw new IllegalArgumentException("String must not be empty.");
- }
- StringBuilder sb = new StringBuilder(clearedString);
- TypeInformation<X> ti = (TypeInformation<X>) parse(sb);
- if (sb.length() > 0) {
- throw new IllegalArgumentException("String could not be parsed completely.");
- }
- return ti;
- } catch (Exception e) {
- throw new IllegalArgumentException("String could not be parsed: " + e.getMessage(), e);
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private static TypeInformation<?> parse(StringBuilder sb) throws ClassNotFoundException {
- String infoString = sb.toString();
- final Matcher tupleMatcher = tuplePattern.matcher(infoString);
-
- final Matcher writableMatcher = writablePattern.matcher(infoString);
- final Matcher enumMatcher = enumPattern.matcher(infoString);
-
- final Matcher basicTypeMatcher = basicTypePattern.matcher(infoString);
- final Matcher basicTypeDateMatcher = basicTypeDatePattern.matcher(infoString);
- final Matcher basicTypeBigIntMatcher = basicTypeBigIntPattern.matcher(infoString);
- final Matcher basicTypeBigDecMatcher = basicTypeBigDecPattern.matcher(infoString);
-
- final Matcher primitiveTypeMatcher = primitiveTypePattern.matcher(infoString);
-
- final Matcher valueTypeMatcher = valueTypePattern.matcher(infoString);
-
- final Matcher pojoGenericMatcher = pojoGenericObjectPattern.matcher(infoString);
-
- if (infoString.length() == 0) {
- return null;
- }
-
- TypeInformation<?> returnType = null;
- boolean isPrimitiveType = false;
-
- // tuples
- if (tupleMatcher.find()) {
- boolean isGenericTuple = true;
- String className = tupleMatcher.group(3);
- if(className == null) { // matched Tuple0
- isGenericTuple = false;
- className = tupleMatcher.group(2);
- sb.delete(0, className.length());
- } else {
- sb.delete(0, className.length() + 1); // +1 for "<"
- }
-
- if (infoString.startsWith(TUPLE_PACKAGE)) {
- sb.delete(0, TUPLE_PACKAGE.length() + 1); // +1 for trailing "."
- }
-
- int arity = Integer.parseInt(className.replaceAll("\\D", ""));
- Class<?> clazz = loadClass(TUPLE_PACKAGE + "." + className);
-
- TypeInformation<?>[] types = new TypeInformation<?>[arity];
- for (int i = 0; i < arity; i++) {
- types[i] = parse(sb);
- if (types[i] == null) {
- throw new IllegalArgumentException("Tuple arity does not match given parameters.");
- }
- }
- if (isGenericTuple) {
- if(sb.charAt(0) != '>') {
- throw new IllegalArgumentException("Tuple arity does not match given parameters.");
- }
- // remove '>'
- sb.deleteCharAt(0);
- }
- returnType = new TupleTypeInfo(clazz, types);
- }
- // writable types
- else if (writableMatcher.find()) {
- String className = writableMatcher.group(1);
- String fullyQualifiedName = writableMatcher.group(3);
- sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1);
- Class<?> clazz = loadClass(fullyQualifiedName);
- returnType = TypeExtractor.createHadoopWritableTypeInfo(clazz);
- }
- // enum types
- else if (enumMatcher.find()) {
- String className = enumMatcher.group(1);
- String fullyQualifiedName = enumMatcher.group(3);
- sb.delete(0, className.length() + 1 + fullyQualifiedName.length() + 1);
- Class<?> clazz = loadClass(fullyQualifiedName);
- returnType = new EnumTypeInfo(clazz);
- }
- // basic types
- else if (basicTypeMatcher.find()) {
- String className = basicTypeMatcher.group(1);
- sb.delete(0, className.length());
- Class<?> clazz;
- // check if fully qualified
- if (className.startsWith("java.lang")) {
- clazz = loadClass(className);
- } else {
- clazz = loadClass("java.lang." + className);
- }
- returnType = BasicTypeInfo.getInfoFor(clazz);
- }
- // special basic type "Date"
- else if (basicTypeDateMatcher.find()) {
- String className = basicTypeDateMatcher.group(1);
- sb.delete(0, className.length());
- Class<?> clazz;
- // check if fully qualified
- if (className.startsWith("java.util")) {
- clazz = loadClass(className);
- } else {
- clazz = loadClass("java.util." + className);
- }
- returnType = BasicTypeInfo.getInfoFor(clazz);
- }
- // special basic type "BigInteger"
- else if (basicTypeBigIntMatcher.find()) {
- String className = basicTypeBigIntMatcher.group(1);
- sb.delete(0, className.length());
- Class<?> clazz;
- // check if fully qualified
- if (className.startsWith("java.math")) {
- clazz = loadClass(className);
- } else {
- clazz = loadClass("java.math." + className);
- }
- returnType = BasicTypeInfo.getInfoFor(clazz);
- }
- // special basic type "BigDecimal"
- else if (basicTypeBigDecMatcher.find()) {
- String className = basicTypeBigDecMatcher.group(1);
- sb.delete(0, className.length());
- Class<?> clazz;
- // check if fully qualified
- if (className.startsWith("java.math")) {
- clazz = loadClass(className);
- } else {
- clazz = loadClass("java.math." + className);
- }
- returnType = BasicTypeInfo.getInfoFor(clazz);
- }
- // primitive types
- else if (primitiveTypeMatcher.find()) {
- String keyword = primitiveTypeMatcher.group(1);
- sb.delete(0, keyword.length());
-
- Class<?> clazz = null;
- if (keyword.equals("int")) {
- clazz = int.class;
- } else if (keyword.equals("byte")) {
- clazz = byte.class;
- } else if (keyword.equals("short")) {
- clazz = short.class;
- } else if (keyword.equals("char")) {
- clazz = char.class;
- } else if (keyword.equals("double")) {
- clazz = double.class;
- } else if (keyword.equals("float")) {
- clazz = float.class;
- } else if (keyword.equals("long")) {
- clazz = long.class;
- } else if (keyword.equals("boolean")) {
- clazz = boolean.class;
- } else if (keyword.equals("void")) {
- clazz = void.class;
- }
- returnType = BasicTypeInfo.getInfoFor(clazz);
- isPrimitiveType = true;
- }
- // values
- else if (valueTypeMatcher.find()) {
- String className = valueTypeMatcher.group(1);
- sb.delete(0, className.length() + 5);
-
- Class<?> clazz;
- // check if fully qualified
- if (className.startsWith(VALUE_PACKAGE)) {
- clazz = loadClass(className + "Value");
- } else {
- clazz = loadClass(VALUE_PACKAGE + "." + className + "Value");
- }
- returnType = ValueTypeInfo.getValueTypeInfo((Class<Value>) clazz);
- }
- // pojo objects or generic types
- else if (pojoGenericMatcher.find()) {
- String fullyQualifiedName = pojoGenericMatcher.group(1);
- sb.delete(0, fullyQualifiedName.length());
-
- boolean isPojo = pojoGenericMatcher.group(2) != null;
-
- // pojo
- if (isPojo) {
- sb.deleteCharAt(0);
- Class<?> clazz = loadClass(fullyQualifiedName);
-
- ArrayList<PojoField> fields = new ArrayList<PojoField>();
- while (sb.charAt(0) != '>') {
- final Matcher fieldMatcher = fieldPattern.matcher(sb);
- if (!fieldMatcher.find()) {
- throw new IllegalArgumentException("Field name missing.");
- }
- String fieldName = fieldMatcher.group(1);
- sb.delete(0, fieldName.length() + 1);
-
- Field field = TypeExtractor.getDeclaredField(clazz, fieldName);
- if (field == null) {
- throw new IllegalArgumentException("Field '" + fieldName + "'could not be accessed.");
- }
- fields.add(new PojoField(field, parse(sb)));
- }
- sb.deleteCharAt(0); // remove '>'
- returnType = new PojoTypeInfo(clazz, fields);
- }
- // generic type
- else {
- returnType = new GenericTypeInfo(loadClass(fullyQualifiedName));
- }
- }
-
- if (returnType == null) {
- throw new IllegalArgumentException("Error at '" + infoString + "'");
- }
-
- // arrays
- int arrayDimensionCount = 0;
- while (sb.length() > 1 && sb.charAt(0) == '[' && sb.charAt(1) == ']') {
- arrayDimensionCount++;
- sb.delete(0, 2);
- }
-
- if (sb.length() > 0 && sb.charAt(0) == '[') {
- throw new IllegalArgumentException("Closing square bracket missing.");
- }
-
- // construct multidimension array
- if (arrayDimensionCount > 0) {
- TypeInformation<?> arrayInfo = null;
-
- // first dimension
- // primitive array
- if (isPrimitiveType) {
- if (returnType == BasicTypeInfo.INT_TYPE_INFO) {
- arrayInfo = PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.BYTE_TYPE_INFO) {
- arrayInfo = PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.SHORT_TYPE_INFO) {
- arrayInfo = PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.CHAR_TYPE_INFO) {
- arrayInfo = PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.DOUBLE_TYPE_INFO) {
- arrayInfo = PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.FLOAT_TYPE_INFO) {
- arrayInfo = PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.LONG_TYPE_INFO) {
- arrayInfo = PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- arrayInfo = PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.VOID_TYPE_INFO) {
- throw new IllegalArgumentException("Can not create an array of void.");
- }
- }
- // basic array
- else if (returnType instanceof BasicTypeInfo
- && returnType != BasicTypeInfo.DATE_TYPE_INFO) {
- if (returnType == BasicTypeInfo.INT_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.BYTE_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.SHORT_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.CHAR_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.DOUBLE_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.FLOAT_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.LONG_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.STRING_TYPE_INFO) {
- arrayInfo = BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO;
- } else if (returnType == BasicTypeInfo.VOID_TYPE_INFO) {
- throw new IllegalArgumentException("Can not create an array of void.");
- }
- }
- // object array
- else {
- arrayInfo = ObjectArrayTypeInfo.getInfoFor(loadClass("[L" + returnType.getTypeClass().getName() + ";"),
- returnType);
- }
-
- // further dimensions
- if (arrayDimensionCount > 1) {
- String arrayPrefix = "[";
- for (int i = 1; i < arrayDimensionCount; i++) {
- arrayPrefix += "[";
- arrayInfo = ObjectArrayTypeInfo.getInfoFor(loadClass(arrayPrefix + "L" +
- returnType.getTypeClass().getName() + ";"), arrayInfo);
- }
- }
- returnType = arrayInfo;
- }
-
- // remove possible ','
- if (sb.length() > 0 && sb.charAt(0) == ',') {
- sb.deleteCharAt(0);
- }
-
- // check if end
- return returnType;
- }
-
- private static Class<?> loadClass(String fullyQualifiedName) {
- try {
- return Class.forName(fullyQualifiedName);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Class '" + fullyQualifiedName
- + "' could not be found. Please note that inner classes must be declared static.");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
index 804cf88..b763f54 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
@@ -84,7 +85,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean"));
+ TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(function, (TypeInformation) Types.BOOLEAN);
Assert.assertTrue(ti.isBasicType());
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
@@ -113,7 +114,7 @@ public class TypeExtractorTest {
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple9<Integer, Long, Double, Float, Boolean, String, Character, Short, Byte>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(9, ti.getArity());
@@ -180,7 +181,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>"));
+ TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(3, ti.getArity());
Assert.assertTrue(ti instanceof TupleTypeInfo);
@@ -249,7 +250,7 @@ public class TypeExtractorTest {
};
TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(function,
- (TypeInformation) TypeInfoParser.parse("Tuple0"));
+ (TypeInformation) TypeInformation.of(new TypeHint<Tuple0>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(0, ti.getArity());
@@ -269,7 +270,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), (TypeInformation) TypeInfoParser.parse("String"));
+ TypeInformation<?> ti = TypeExtractor.getFlatJoinReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}), (TypeInformation) Types.STRING);
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -323,8 +324,8 @@ public class TypeExtractorTest {
};
TypeInformation<?> ti = TypeExtractor.getCrossReturnTypes(function,
- (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType"),
- (TypeInformation) TypeInfoParser.parse("Integer"));
+ (TypeInformation) TypeInformation.of(new TypeHint<CustomType>(){}),
+ (TypeInformation) Types.INT);
Assert.assertFalse(ti.isBasicType());
Assert.assertFalse(ti.isTupleType());
@@ -394,7 +395,7 @@ public class TypeExtractorTest {
};
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function,
- (TypeInformation) TypeInfoParser.parse("Tuple2<Long,org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType>"));
+ (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<Long, CustomType>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -451,7 +452,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(function, (TypeInformation) TypeInfoParser.parse("StringValue"));
+ TypeInformation<?> ti = TypeExtractor.getKeySelectorTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<StringValue>(){}));
Assert.assertFalse(ti.isBasicType());
Assert.assertFalse(ti.isTupleType());
@@ -481,7 +482,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<StringValue, IntValue>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<StringValue, IntValue>>(){}));
Assert.assertFalse(ti.isBasicType());
Assert.assertTrue(ti.isTupleType());
@@ -520,7 +521,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Long, String>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<Long, String>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -563,7 +564,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, Integer>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<String, Long, Integer>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(3, ti.getArity());
@@ -605,7 +606,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, String>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<String, Long, String>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(3, ti.getArity());
@@ -631,7 +632,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, Long, String>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<String, Long, String>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(3, ti.getArity());
@@ -656,11 +657,11 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true);
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING, "name", true);
Assert.assertTrue(ti instanceof MissingTypeInfo);
try {
- TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"));
+ TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING);
Assert.fail("Expected an exception");
}
catch (InvalidTypesException e) {
@@ -680,11 +681,11 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"), "name", true);
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING, "name", true);
Assert.assertTrue(ti instanceof MissingTypeInfo);
try {
- TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"));
+ TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING);
Assert.fail("Expected an exception");
}
catch (InvalidTypesException e) {
@@ -712,7 +713,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, String>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -744,7 +745,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, Tuple2<Integer, Integer>>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, Tuple2<Integer, Integer>>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -783,7 +784,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Boolean, Tuple2<Tuple2<Integer, Tuple2<Boolean, Boolean>>, Tuple2<Integer, Tuple2<Boolean, Boolean>>>>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<Boolean, Tuple2<Tuple2<Integer, Tuple2<Boolean, Boolean>>, Tuple2<Integer, Tuple2<Boolean, Boolean>>>>>(){}));
// Should be
// Tuple2<Boolean, Tuple2<Tuple2<Integer, Tuple2<Boolean, Boolean>>, Tuple2<Integer, Tuple2<Boolean, Boolean>>>>
@@ -823,11 +824,11 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String"), "name", true);
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, Types.STRING, "name", true);
Assert.assertTrue(ti instanceof MissingTypeInfo);
try {
- TypeExtractor.getMapReturnTypes(function, TypeInfoParser.parse("String"));
+ TypeExtractor.getMapReturnTypes(function, Types.STRING);
Assert.fail("Expected an exception");
}
catch (InvalidTypesException e) {
@@ -842,7 +843,7 @@ public class TypeExtractorTest {
private static final long serialVersionUID = 1L;
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.BOOLEAN);
Assert.assertTrue(ti.isBasicType());
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
@@ -906,7 +907,7 @@ public class TypeExtractorTest {
public void testFunctionDependingOnInputWithCustomTupleInput() {
IdentityMapper<SameTypeVariable<String>> function = new IdentityMapper<SameTypeVariable<String>>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, String>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -994,7 +995,7 @@ public class TypeExtractorTest {
public void testFunctionWithNoGenericSuperclass() {
RichMapFunction<?, ?> function = new Mapper2();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) Types.STRING);
Assert.assertTrue(ti.isBasicType());
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
@@ -1015,7 +1016,7 @@ public class TypeExtractorTest {
private static final long serialVersionUID = 1L;
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("DoubleValue"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<DoubleValue>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
@@ -1156,11 +1157,11 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti =TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue"), "name", true);
+ TypeInformation<?> ti =TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInformation.of(new TypeHint<StringValue>(){}), "name", true);
Assert.assertTrue(ti instanceof MissingTypeInfo);
try {
- TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInfoParser.parse("StringValue"));
+ TypeExtractor.getMapReturnTypes(function, (TypeInformation)TypeInformation.of(new TypeHint<StringValue>(){}));
Assert.fail("Expected an exception");
}
catch (InvalidTypesException e) {
@@ -1181,7 +1182,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(function, (TypeInformation) TypeInfoParser.parse("String[]"), (TypeInformation) TypeInfoParser.parse("String[]"));
+ TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<String[]>(){}), (TypeInformation) TypeInformation.of(new TypeHint<String[]>(){}));
Assert.assertFalse(ti.isBasicType());
Assert.assertFalse(ti.isTupleType());
@@ -1224,7 +1225,7 @@ public class TypeExtractorTest {
};
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function,
- (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomArrayObject[]"));
+ (TypeInformation) TypeInformation.of(new TypeHint<CustomArrayObject[]>(){}));
Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
@@ -1242,7 +1243,7 @@ public class TypeExtractorTest {
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<String, String>[]"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, String>[]>(){}));
Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti;
@@ -1262,7 +1263,7 @@ public class TypeExtractorTest {
public void testCustomArrayWithTypeVariable() {
RichMapFunction<CustomArrayObject2<Boolean>[], ?> function = new IdentityMapper<CustomArrayObject2<Boolean>[]>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple1<Boolean>[]"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple1<Boolean>[]>(){}));
Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti;
@@ -1287,7 +1288,7 @@ public class TypeExtractorTest {
private static final long serialVersionUID = 1L;
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean[]"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Boolean[]>(){}));
Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?,?>);
ObjectArrayTypeInfo<?, ?> oati = (ObjectArrayTypeInfo<?, ?>) ti;
Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, oati.getComponentInfo());
@@ -1346,14 +1347,14 @@ public class TypeExtractorTest {
};
try {
- TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple2<Integer, String>"));
+ TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<Integer, String>>(){}));
Assert.fail("exception expected");
} catch (InvalidTypesException e) {
// right
}
try {
- TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Tuple3<String, String, String>"));
+ TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}));
Assert.fail("exception expected");
} catch (InvalidTypesException e) {
// right
@@ -1369,7 +1370,7 @@ public class TypeExtractorTest {
};
try {
- TypeExtractor.getMapReturnTypes(function2, (TypeInformation) TypeInfoParser.parse("IntValue"));
+ TypeExtractor.getMapReturnTypes(function2, (TypeInformation) TypeInformation.of(new TypeHint<IntValue>(){}));
Assert.fail("exception expected");
} catch (InvalidTypesException e) {
// right
@@ -1385,7 +1386,7 @@ public class TypeExtractorTest {
};
try {
- TypeExtractor.getMapReturnTypes(function3, (TypeInformation) TypeInfoParser.parse("Integer[]"));
+ TypeExtractor.getMapReturnTypes(function3, (TypeInformation) TypeInformation.of(new TypeHint<Integer[]>(){}));
Assert.fail("exception expected");
} catch (InvalidTypesException e) {
// right
@@ -1405,12 +1406,12 @@ public class TypeExtractorTest {
@Test
public void testTypeErasure() {
TypeInformation<?> ti = TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction<String, Integer, String, Boolean>(),
- (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"), "name", true);
+ (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}), "name", true);
Assert.assertTrue(ti instanceof MissingTypeInfo);
try {
TypeExtractor.getFlatMapReturnTypes(new DummyFlatMapFunction<String, Integer, String, Boolean>(),
- (TypeInformation) TypeInfoParser.parse("Tuple2<String, Integer>"));
+ (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
Assert.fail("Expected an exception");
}
@@ -1571,7 +1572,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testDuplicateValue() {
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValue<String>(), TypeInfoParser.parse("Tuple1<String>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValue<String>(), TypeInformation.of(new TypeHint<Tuple1<String>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -1591,7 +1592,7 @@ public class TypeExtractorTest {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testDuplicateValueNested() {
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValueNested<String>(), TypeInfoParser.parse("Tuple1<Tuple1<String>>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) new DuplicateValueNested<String>(), TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -1618,7 +1619,7 @@ public class TypeExtractorTest {
@Test
public void testInputInference1() {
EdgeMapper<String, Double> em = new EdgeMapper<String, Double>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<String, String, Double>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInformation.of(new TypeHint<Tuple3<String, String, Double>>(){}));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(3, ti.getArity());
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -1641,7 +1642,7 @@ public class TypeExtractorTest {
@Test
public void testInputInference2() {
EdgeMapper2<Boolean> em = new EdgeMapper2<Boolean>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Boolean"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, Types.BOOLEAN);
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(3, ti.getArity());
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
@@ -1663,7 +1664,7 @@ public class TypeExtractorTest {
@Test
public void testInputInference3() {
EdgeMapper3<Boolean, String> em = new EdgeMapper3<Boolean, String>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<Boolean,Boolean,String>"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInformation.of(new TypeHint<Tuple3<Boolean,Boolean,String>>(){}));
Assert.assertTrue(ti.isBasicType());
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
}
@@ -1681,7 +1682,7 @@ public class TypeExtractorTest {
@Test
public void testInputInference4() {
EdgeMapper4<Boolean, String> em = new EdgeMapper4<Boolean, String>();
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInfoParser.parse("Tuple3<Boolean,Boolean,String>[]"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) em, TypeInformation.of(new TypeHint<Tuple3<Boolean,Boolean,String>[]>(){}));
Assert.assertTrue(ti.isBasicType());
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
}
@@ -1762,7 +1763,7 @@ public class TypeExtractorTest {
return null;
}
};
- TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("Tuple2<Integer, Double>[][]"));
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInformation.of(new TypeHint<Tuple2<Integer, Double>[][]>(){}));
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple2<Integer, Double>>>", ti.toString());
// primitive array
@@ -1775,7 +1776,7 @@ public class TypeExtractorTest {
return null;
}
};
- ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("int[][][]"));
+ ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInformation.of(new TypeHint<int[][][]>(){}));
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<int[]>>", ti.toString());
// basic array
@@ -1788,7 +1789,7 @@ public class TypeExtractorTest {
return null;
}
};
- ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInfoParser.parse("Integer[][][]"));
+ ti = TypeExtractor.getMapReturnTypes((MapFunction)function, TypeInformation.of(new TypeHint<Integer[][][]>(){}));
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<BasicArrayTypeInfo<Integer>>>", ti.toString());
// pojo array
@@ -1802,16 +1803,14 @@ public class TypeExtractorTest {
}
};
ti = TypeExtractor.getMapReturnTypes((MapFunction)function,
- TypeInfoParser.parse("org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType<"
- + "myField1=String,myField2=int"
- + ">[][][]"));
+ TypeInformation.of(new TypeHint<CustomType[][][]>(){}));
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<"
+ "PojoType<org.apache.flink.api.java.typeutils.TypeExtractorTest$CustomType, fields = [myField1: String, myField2: Integer]>"
+ ">>>", ti.toString());
// generic array
- ti = TypeExtractor.getMapReturnTypes((MapFunction) new MapperWithMultiDimGenericArray<String>(), TypeInfoParser.parse("String[][][]"));
+ ti = TypeExtractor.getMapReturnTypes((MapFunction) new MapperWithMultiDimGenericArray<String>(), TypeInformation.of(new TypeHint<String[][][]>(){}));
Assert.assertEquals("ObjectArrayTypeInfo<ObjectArrayTypeInfo<ObjectArrayTypeInfo<Java Tuple1<String>>>>", ti.toString());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index 1b0cbec..e7b87a9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
@@ -161,8 +164,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testEnumTrianglesBasicExamplesTriadBuilder() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, TriadBuilder.class,
- "Tuple2<Integer, Integer>",
- "Tuple3<Integer, Integer, Integer>",
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, Integer>>(){}),
new String[] { "0" });
}
@@ -180,8 +183,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testEnumTrianglesBasicExamplesTupleEdgeConverter() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, TupleEdgeConverter.class,
- "Tuple2<Integer, Integer>",
- "Tuple2<Integer, Integer>");
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
}
private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> {
@@ -196,8 +199,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testEnumTrianglesOptExamplesEdgeDuplicator() {
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, EdgeDuplicator.class,
- "Tuple2<Integer, Integer>",
- "Tuple2<Integer, Integer>");
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
}
private static class DegreeCounter implements GroupReduceFunction<Edge, Edge> {
@@ -242,8 +245,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testEnumTrianglesOptExamplesDegreeCounter() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DegreeCounter.class,
- "Tuple2<Integer, Integer>",
- "Tuple2<Integer, Integer>",
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}),
new String[] { "0" });
}
@@ -321,8 +324,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testKMeansExamplesCentroidAccumulator() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, CentroidAccumulator.class,
- "Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
- "Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
+ TypeInformation.of(new TypeHint<Tuple3<Integer, Point, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple3<Integer, Point, Long>>(){}),
new String[] { "0" });
}
@@ -337,8 +340,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testKMeansExamplesCentroidAverager() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, CentroidAverager.class,
- "Tuple3<Integer, org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Point<x=double,y=double>, Long>",
- "org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$Centroid<x=double,y=double,id=int>");
+ TypeInformation.of(new TypeHint<Tuple3<Integer, Point, Long>>(){}),
+ TypeInformation.of(new TypeHint<Centroid>(){}));
}
// --------------------------------------------------------------------------------------------
@@ -360,8 +363,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testConnectedComponentsExamplesUndirectEdge() {
compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, UndirectEdge.class,
- "Tuple2<Long, Long>",
- "Tuple2<Long, Long>");
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
@ForwardedFieldsFirst("*")
@@ -377,9 +380,9 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testConnectedComponentsExamplesComponentIdFilter() {
compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, ComponentIdFilter.class,
- "Tuple2<Long, Long>",
- "Tuple2<Long, Long>",
- "Tuple2<Long, Long>");
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
@ForwardedFields("*->f0;*->f1")
@@ -393,8 +396,7 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testConnectedComponentsExamplesDuplicateValue() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, DuplicateValue.class,
- "Long",
- "Tuple2<Long, Long>");
+ Types.LONG, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
@ForwardedFieldsFirst("f1->f1")
@@ -409,9 +411,9 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testConnectedComponentsExamplesNeighborWithComponentIDJoin() {
compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, NeighborWithComponentIDJoin.class,
- "Tuple2<Long, Long>",
- "Tuple2<Long, Long>",
- "Tuple2<Long, Long>");
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
// --------------------------------------------------------------------------------------------
@@ -435,9 +437,9 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testWebLogAnalysisExamplesAntiJoinVisits() {
compareAnalyzerResultWithAnnotationsDualInputWithKeys(CoGroupFunction.class, AntiJoinVisits.class,
- "Tuple3<Integer, String, Integer>",
- "Tuple1<String>",
- "Tuple3<Integer, String, Integer>",
+ TypeInformation.of(new TypeHint<Tuple3<Integer, String, Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple1<String>>(){}),
+ TypeInformation.of(new TypeHint<Tuple3<Integer, String, Integer>>(){}),
new String[] { "1" }, new String[] { "0" });
}
@@ -465,8 +467,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testPageRankBasicExamplesBuildOutgoingEdgeList() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, BuildOutgoingEdgeList.class,
- "Tuple2<Long, Long>",
- "Tuple2<Long, Long[]>",
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long[]>>(){}),
new String[] { "0" });
}
@@ -551,8 +553,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testLogisticRegressionExamplesSumGradient() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, SumGradient.class,
- "Tuple1<double>",
- "Tuple1<double>",
+ TypeInformation.of(new TypeHint<Tuple1<Double>>(){}),
+ TypeInformation.of(new TypeHint<Tuple1<Double>>(){}),
new String[] { "0" });
}
@@ -584,8 +586,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testLogisticRegressionExamplesPointParser() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
- "String",
- "Tuple2<Integer, double[]>");
+ Types.STRING,
+ TypeInformation.of(new TypeHint<Tuple2<Integer, double[]>>(){}));
}
// --------------------------------------------------------------------------------------------
@@ -620,8 +622,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testCanopyExamplesMassageBOW() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, PointParser.class,
- "String",
- "Tuple2<Integer, String>");
+ Types.STRING,
+ TypeInformation.of(new TypeHint<Tuple2<Integer, String>>(){}));
}
@ForwardedFields("0")
@@ -642,8 +644,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testCanopyExamplesDocumentReducer() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, DocumentReducer.class,
- "Tuple2<Integer, String>",
- "Tuple5<Integer, Boolean, Boolean, String, String>",
+ TypeInformation.of(new TypeHint<Tuple2<Integer, String>>(){}),
+ TypeInformation.of(new TypeHint<Tuple5<Integer, Boolean, Boolean, String, String>>(){}),
new String[] { "0" });
}
@@ -669,8 +671,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testCanopyExamplesMapToCenter() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, MapToCenter.class,
- "Tuple5<Integer, Boolean, Boolean, String, String>",
- "Tuple5<Integer, Boolean, Boolean, String, String>");
+ TypeInformation.of(new TypeHint<Tuple5<Integer, Boolean, Boolean, String, String>>(){}),
+ TypeInformation.of(new TypeHint<Tuple5<Integer, Boolean, Boolean, String, String>>(){}));
}
// --------------------------------------------------------------------------------------------
@@ -725,8 +727,8 @@ public class UdfAnalyzerExamplesTest {
@Test
public void testKMeansppExamplesRecordToDocConverter() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, RecordToDocConverter.class,
- "Tuple3<Integer, Integer, Double>",
- "org.apache.flink.api.java.sca.UdfAnalyzerExamplesTest$DocumentWithFreq<id=Integer,wordFreq=java.util.HashMap>",
+ TypeInformation.of(new TypeHint<Tuple3<Integer, Integer, Double>>(){}),
+ TypeInformation.of(new TypeHint<DocumentWithFreq>(){}),
new String[] { "0" });
}
}
[2/4] flink git commit: [FLINK-9292] [core] Remove TypeInfoParser
(part 2)
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index 7e88838..dcea16d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -30,7 +30,9 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
@@ -41,7 +43,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.util.Collector;
import org.junit.Assert;
@@ -60,6 +62,10 @@ import static org.junit.Assert.assertEquals;
@SuppressWarnings("serial")
public class UdfAnalyzerTest {
+ private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE2_TYPE_INFO = TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){});
+
+ private static final TypeInformation<Tuple2<String, String>> STRING_STRING_TUPLE2_TYPE_INFO = TypeInformation.of(new TypeHint<Tuple2<String, String>>(){});
+
@ForwardedFields("f0->*")
private static class Map1 implements MapFunction<Tuple2<String, Integer>, String> {
public String map(Tuple2<String, Integer> value) throws Exception {
@@ -69,8 +75,8 @@ public class UdfAnalyzerTest {
@Test
public void testSingleFieldExtract() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class, "Tuple2<String,Integer>",
- "String");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map1.class,
+ STRING_INT_TUPLE2_TYPE_INFO, Types.STRING);
}
@ForwardedFields("f0->f0;f0->f1")
@@ -82,8 +88,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoTuple() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class, "Tuple2<String,Integer>",
- "Tuple2<String,String>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map2.class,
+ STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO);
}
private static class Map3 implements MapFunction<String[], Integer> {
@@ -96,7 +102,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithArrayAttrAccess() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class, "String[]", "Integer");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map3.class,
+ TypeInformation.of(new TypeHint<String[]>(){}), Types.INT);
}
private static class Map4 implements MapFunction<MyPojo, String> {
@@ -109,7 +116,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithGenericTypePublicAttrAccess() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String");
+ new GenericTypeInfo<>(MyPojo.class), Types.STRING);
}
@ForwardedFields("field2->*")
@@ -123,7 +130,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithPojoPublicAttrAccess() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map5.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String");
+ TypeInformation.of(new TypeHint<MyPojo>(){}), Types.STRING);
}
@ForwardedFields("field->*")
@@ -137,7 +144,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithPojoPrivateAttrAccess() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map6.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", "String");
+ TypeInformation.of(new TypeHint<MyPojo>(){}), Types.STRING);
}
@ForwardedFields("f0->f1")
@@ -153,8 +160,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoTupleWithCondition() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class, "Tuple2<String,Integer>",
- "Tuple2<String,String>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map7.class,
+ STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO);
}
private static class Map8 implements MapFunction<Tuple2<String, String>, String> {
@@ -169,8 +176,8 @@ public class UdfAnalyzerTest {
@Test
public void testSingleFieldExtractWithCondition() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class, "Tuple2<String,String>",
- "String");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map8.class,
+ STRING_STRING_TUPLE2_TYPE_INFO, Types.STRING);
}
@ForwardedFields("*->f0")
@@ -185,7 +192,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoTupleWithInstanceVar() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, "String", "Tuple1<String>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map9.class, Types.STRING,
+ TypeInformation.of(new TypeHint<Tuple1<String>>(){}));
}
@ForwardedFields("*->f0.f0")
@@ -200,8 +208,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoTupleWithInstanceVar2() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, "String",
- "Tuple1<Tuple1<String>>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map10.class, Types.STRING,
+ TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}));
}
@ForwardedFields("*->f1")
@@ -222,8 +230,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoTupleWithInstanceVarChangedByOtherMethod() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, "String",
- "Tuple2<String, String>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map11.class, Types.STRING,
+ STRING_STRING_TUPLE2_TYPE_INFO);
}
@ForwardedFields("f0->f0.f0;f0->f1.f0")
@@ -236,8 +244,9 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoNestedTuple() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class, "Tuple2<String,Integer>",
- "Tuple2<Tuple1<String>,Tuple1<String>>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map12.class,
+ STRING_INT_TUPLE2_TYPE_INFO,
+ TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Tuple1<String>>>(){}));
}
@ForwardedFields("f0->f1.f0")
@@ -253,8 +262,9 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoNestedTupleWithVarAndModification() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class, "Tuple2<String,Integer>",
- "Tuple2<Tuple1<String>,Tuple1<String>>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map13.class,
+ STRING_INT_TUPLE2_TYPE_INFO,
+ TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Tuple1<String>>>(){}));
}
@ForwardedFields("f0")
@@ -268,8 +278,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoTupleWithAssignment() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, "Tuple2<String,Integer>",
- "Tuple2<String,String>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class,
+ STRING_INT_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO);
}
@ForwardedFields("f0.f0->f0")
@@ -284,7 +294,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoTupleWithInputPath() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map15.class,
- "Tuple2<Tuple1<String>,Integer>", "Tuple2<String,String>");
+ TypeInformation.of(new TypeHint<Tuple2<Tuple1<String>, Integer>>(){}),
+ STRING_STRING_TUPLE2_TYPE_INFO);
}
@ForwardedFields("field->field2;field2->field")
@@ -300,8 +311,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoPojoByGettersAndSetters() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map16.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+ TypeInformation.of(new TypeHint<MyPojo>(){}), TypeInformation.of(new TypeHint<MyPojo>(){}));
}
private static class Map17 implements MapFunction<String, Tuple1<String>> {
@@ -319,7 +329,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoTupleWithInstanceVarAndCondition() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, "String", "Tuple1<String>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map17.class, Types.STRING,
+ TypeInformation.of(new TypeHint<Tuple1<String>>(){}));
}
private static class Map18 implements MapFunction<Tuple1<String>, ArrayList<String>> {
@@ -333,8 +344,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoUnsupportedObject() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class, "Tuple1<String>",
- "java.util.ArrayList");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map18.class,
+ TypeInformation.of(new TypeHint<Tuple1<String>>(){}), TypeInformation.of(new TypeHint<java.util.ArrayList>(){}));
}
@ForwardedFields("*->f0")
@@ -351,7 +362,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithNewTupleToNewTupleAssignment() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, "Integer", "Tuple1<Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map19.class, Types.INT,
+ TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
}
@ForwardedFields("f0;f1")
@@ -371,7 +383,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithGetMethod() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map20.class,
- "Tuple4<Integer, Integer, Integer, Integer>", "Tuple4<Integer, Integer, Integer, Integer>");
+ TypeInformation.of(new TypeHint<Tuple4<Integer, Integer, Integer, Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple4<Integer, Integer, Integer, Integer>>(){}));
}
@ForwardedFields("f0->f1;f1->f0")
@@ -387,8 +400,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithSetMethod() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class, "Tuple2<Integer, Integer>",
- "Tuple2<Integer, Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map21.class,
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
}
@ForwardedFields("f0->f1;f1->f0")
@@ -404,8 +417,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardIntoNewTupleWithSetMethod() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class, "Tuple2<Integer, Integer>",
- "Tuple2<Integer, Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map22.class,
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
}
@ForwardedFields("*")
@@ -426,8 +439,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithGetMethod2() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class, "Tuple1<Integer>",
- "Tuple1<Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map23.class,
+ TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}), TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
}
private static class Map24 implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
@@ -442,8 +455,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithSetMethod2() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class, "Tuple2<Integer, Integer>",
- "Tuple2<Integer, Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map24.class,
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
}
@ForwardedFields("f1->f0;f1")
@@ -457,8 +470,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithModifiedInput() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class, "Tuple2<Integer, Integer>",
- "Tuple2<Integer, Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map25.class,
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}), TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
}
@ForwardedFields("*->1")
@@ -482,8 +495,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithTuplesGetSetFieldMethods() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, "Integer",
- "Tuple2<Integer, Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map26.class, Types.INT,
+ TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>(){}));
}
@ForwardedFields("2->3;3->7")
@@ -515,8 +528,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithTuplesGetSetFieldMethods2() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map27.class,
- "Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>",
- "Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>");
+ TypeInformation.of(new TypeHint<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple8<Integer, Integer, Integer, Integer, Integer, Integer, Integer, Integer>>(){}));
}
private static class Map28 implements MapFunction<Integer, Integer> {
@@ -531,7 +544,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithBranching1() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, "Integer", "Integer");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map28.class, Types.INT, Types.INT);
}
@ForwardedFields("0")
@@ -555,7 +568,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithBranching2() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map29.class,
- "Tuple3<String, String, String>", "Tuple3<String, String, String>");
+ TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}),
+ TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}));
}
private static class Map30 implements MapFunction<Tuple2<String, String>, String> {
@@ -573,8 +587,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithBranching3() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class, "Tuple2<String,String>",
- "String");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map30.class,
+ STRING_STRING_TUPLE2_TYPE_INFO, Types.STRING);
}
@ForwardedFields("1->1;1->0")
@@ -591,8 +605,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithInheritance() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class, "Tuple2<String,String>",
- "Tuple2<String,String>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map31.class,
+ STRING_STRING_TUPLE2_TYPE_INFO, STRING_STRING_TUPLE2_TYPE_INFO);
}
@ForwardedFields("*")
@@ -620,8 +634,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithUnboxingAndBoxing() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map32.class,
- "Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>",
- "Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>");
+ TypeInformation.of(new TypeHint<Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>>(){}),
+ TypeInformation.of(new TypeHint<Tuple8<Boolean, Character, Byte, Short, Integer, Long, Float, Double>>(){}));
}
private static class Map33 implements MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@@ -640,8 +654,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithBranching4() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, "Tuple2<Long, Long>",
- "Tuple2<Long, Long>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map33.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
@ForwardedFields("1")
@@ -663,8 +677,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithBranching5() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, "Tuple2<Long, Long>",
- "Tuple2<Long, Long>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map34.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
private static class Map35 implements MapFunction<String[], Tuple2<String[], String[]>> {
@@ -678,8 +692,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithArrayModification() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, "String[]",
- "Tuple2<String[], String[]>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map35.class, TypeInformation.of(new TypeHint<String[]>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<String[], String[]>>(){}));
}
private static class Map36 implements MapFunction<Tuple3<String, String, String>, Tuple3<String, String, String>> {
@@ -696,8 +710,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithBranching6() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, "Tuple3<String, String, String>",
- "Tuple3<String, String, String>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map36.class, TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}),
+ TypeInformation.of(new TypeHint<Tuple3<String, String, String>>(){}));
}
private static class Map37 implements MapFunction<Tuple1<Tuple1<String>>, Tuple1<Tuple1<String>>> {
@@ -711,8 +725,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithGetAndModification() {
- compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, "Tuple1<Tuple1<String>>",
- "Tuple1<Tuple1<String>>");
+ compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map37.class, TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}),
+ TypeInformation.of(new TypeHint<Tuple1<Tuple1<String>>>(){}));
}
@ForwardedFields("field")
@@ -727,8 +741,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithInheritance2() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map38.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>",
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo2<field=String,field2=String>");
+ TypeInformation.of(new TypeHint<MyPojo2>(){}),
+ TypeInformation.of(new TypeHint<MyPojo2>(){}));
}
private static class Map39 implements MapFunction<MyPojo, MyPojo> {
@@ -743,8 +757,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithGenericTypeOutput() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map39.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo");
+ TypeInformation.of(new TypeHint<GenericTypeInfo<MyPojo>>(){}),
+ TypeInformation.of(new TypeHint<GenericTypeInfo<MyPojo>>(){}));
}
@ForwardedFields("field2")
@@ -766,8 +780,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithRecursion() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map40.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+ TypeInformation.of(new TypeHint<MyPojo>(){}),
+ TypeInformation.of(new TypeHint<MyPojo>(){}));
}
@ForwardedFields("field;field2")
@@ -784,8 +798,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithGetRuntimeContext() {
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map41.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>");
+ TypeInformation.of(new TypeHint<MyPojo>(){}),
+ TypeInformation.of(new TypeHint<MyPojo>(){}));
}
@ForwardedFields("*")
@@ -798,8 +812,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithCollector() {
- compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap1.class, "Tuple1<Integer>",
- "Tuple1<Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap1.class, TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
}
@ForwardedFields("0->1;1->0")
@@ -817,8 +831,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWith2Collectors() {
- compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap2.class, "Tuple2<Long, Long>",
- "Tuple2<Long, Long>");
+ compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap2.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
private static class FlatMap3 implements FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
@@ -835,8 +849,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithCollectorPassing() {
- compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap3.class, "Tuple1<Integer>",
- "Tuple1<Integer>");
+ compareAnalyzerResultWithAnnotationsSingleInput(FlatMapFunction.class, FlatMap3.class, TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}),
+ TypeInformation.of(new TypeHint<Tuple1<Integer>>(){}));
}
@ForwardedFieldsFirst("f1->f1")
@@ -850,8 +864,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithDualInput() {
- compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, "Tuple2<Long, Long>",
- "Tuple2<Long, Long>", "Tuple2<Long, Long>");
+ compareAnalyzerResultWithAnnotationsDualInput(JoinFunction.class, Join1.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
@ForwardedFieldsFirst("*")
@@ -866,8 +880,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithDualInputAndCollector() {
- compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, Join2.class, "Tuple2<Long, Long>",
- "Tuple2<Long, Long>", "Tuple2<Long, Long>");
+ compareAnalyzerResultWithAnnotationsDualInput(FlatJoinFunction.class, Join2.class, TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}),
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}));
}
@ForwardedFields("0")
@@ -881,7 +895,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithIterable() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce1.class,
- "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "0" });
}
@ForwardedFields("1->0")
@@ -907,7 +921,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithIterable2() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce2.class,
- "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "0", "1" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "0", "1" });
}
@ForwardedFields("field2")
@@ -923,8 +937,8 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithIterable3() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce3.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>", new String[] { "field2" });
+ TypeInformation.of(new TypeHint<MyPojo>(){}),
+ TypeInformation.of(new TypeHint<MyPojo>(){}), new String[] { "field2" });
}
@ForwardedFields("f0->*")
@@ -942,7 +956,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithAtLeastOneIterationAssumption() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4.class,
- "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
}
@ForwardedFields("f0->*")
@@ -967,7 +981,7 @@ public class UdfAnalyzerTest {
public void testForwardWithAtLeastOneIterationAssumptionForJavac() {
// this test simulates javac behaviour in Eclipse IDE
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce4Javac.class,
- "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
}
private static class GroupReduce5 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -987,7 +1001,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithAtLeastOneIterationAssumption2() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce5.class,
- "Tuple2<Long, Long>", "Long", new String[] { "f1" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f1" });
}
private static class GroupReduce6 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -1005,7 +1019,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithAtLeastOneIterationAssumption3() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce6.class,
- "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
}
private static class GroupReduce7 implements GroupReduceFunction<Tuple2<Long, Long>, Long> {
@@ -1023,7 +1037,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithAtLeastOneIterationAssumption4() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce7.class,
- "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
}
@ForwardedFields("f0->*")
@@ -1042,7 +1056,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithAtLeastOneIterationAssumption5() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce8.class,
- "Tuple2<Long, Long>", "Long", new String[] { "f0" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.LONG, new String[] { "f0" });
}
@ForwardedFields("f0")
@@ -1061,7 +1075,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithAtLeastOneIterationAssumption6() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce9.class,
- "Tuple2<Long, Long>", "Tuple2<Long, Long>", new String[] { "f0" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), new String[] { "f0" });
}
private static class GroupReduce10 implements GroupReduceFunction<Tuple2<Long, Long>, Boolean> {
@@ -1082,7 +1096,7 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithAtLeastOneIterationAssumption7() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(GroupReduceFunction.class, GroupReduce10.class,
- "Tuple2<Long, Long>", "Boolean", new String[] { "f0" });
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>(){}), Types.BOOLEAN, new String[] { "f0" });
}
@ForwardedFields("field")
@@ -1096,9 +1110,9 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithReduce() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce1.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- new String[] { "field" });
+ TypeInformation.of(new TypeHint<MyPojo>(){}),
+ TypeInformation.of(new TypeHint<MyPojo>(){}),
+ new String[] { "field" });
}
@ForwardedFields("field")
@@ -1115,9 +1129,9 @@ public class UdfAnalyzerTest {
@Test
public void testForwardWithBranchingReduce() {
compareAnalyzerResultWithAnnotationsSingleInputWithKeys(ReduceFunction.class, Reduce2.class,
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo<field=String,field2=String>",
- new String[] { "field" });
+ TypeInformation.of(new TypeHint<MyPojo>(){}),
+ TypeInformation.of(new TypeHint<MyPojo>(){}),
+ new String[] { "field" });
}
private static class NullReturnMapper1 implements MapFunction<String, String> {
@@ -1215,7 +1229,7 @@ public class UdfAnalyzerTest {
public void testFilterModificationException1() {
try {
final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod1.class, "operator",
- TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true);
+ STRING_STRING_TUPLE2_TYPE_INFO, null, null, null, null, true);
ua.analyze();
Assert.fail();
}
@@ -1237,7 +1251,7 @@ public class UdfAnalyzerTest {
public void testFilterModificationException2() {
try {
final UdfAnalyzer ua = new UdfAnalyzer(FilterFunction.class, FilterMod2.class, "operator",
- TypeInfoParser.parse("Tuple2<String, String>"), null, null, null, null, true);
+ STRING_STRING_TUPLE2_TYPE_INFO, null, null, null, null, true);
ua.analyze();
Assert.fail();
}
@@ -1303,17 +1317,14 @@ public class UdfAnalyzerTest {
}
}
- public static void compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> clazz, String in,
- String out) {
- compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, in, out, null);
+ public static void compareAnalyzerResultWithAnnotationsSingleInput(Class<?> baseClass, Class<?> clazz,
+ TypeInformation<?> inType, TypeInformation<?> outType) {
+ compareAnalyzerResultWithAnnotationsSingleInputWithKeys(baseClass, clazz, inType, outType, null);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void compareAnalyzerResultWithAnnotationsSingleInputWithKeys(Class<?> baseClass, Class<?> clazz,
- String in, String out, String[] keys) {
- final TypeInformation<?> inType = TypeInfoParser.parse(in);
- final TypeInformation<?> outType = TypeInfoParser.parse(out);
-
+ TypeInformation<?> inType, TypeInformation<?> outType, String[] keys) {
// expected
final Set<Annotation> annotations = FunctionAnnotation.readSingleForwardAnnotations(clazz);
SingleInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsSingle(annotations, inType,
@@ -1331,18 +1342,14 @@ public class UdfAnalyzerTest {
assertEquals(expected.toString(), actual.toString());
}
- public static void compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> clazz, String in1,
- String in2, String out) {
- compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1, in2, out, null, null);
+ public static void compareAnalyzerResultWithAnnotationsDualInput(Class<?> baseClass, Class<?> clazz,
+ TypeInformation<?> in1Type, TypeInformation<?> in2Type, TypeInformation<?> outType) {
+ compareAnalyzerResultWithAnnotationsDualInputWithKeys(baseClass, clazz, in1Type, in2Type, outType, null, null);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void compareAnalyzerResultWithAnnotationsDualInputWithKeys(Class<?> baseClass, Class<?> clazz,
- String in1, String in2, String out, String[] keys1, String[] keys2) {
- final TypeInformation<?> in1Type = TypeInfoParser.parse(in1);
- final TypeInformation<?> in2Type = TypeInfoParser.parse(in2);
- final TypeInformation<?> outType = TypeInfoParser.parse(out);
-
+ TypeInformation<?> in1Type, TypeInformation<?> in2Type, TypeInformation<?> outType, String[] keys1, String[] keys2) {
// expected
final Set<Annotation> annotations = FunctionAnnotation.readDualForwardAnnotations(clazz);
final DualInputSemanticProperties expected = SemanticPropUtil.getSemanticPropsDual(annotations, in1Type,
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
index a31ce2e..06f486d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.graph.asm.dataset;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
@@ -62,7 +61,7 @@ public class ChecksumHashCodeTest {
@Test
public void testEmptyList() throws Exception {
- DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+ DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG);
Checksum checksum = new ChecksumHashCode<Long>().run(dataset).execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
index cfeadce..027b809 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.graph.asm.dataset;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -61,7 +60,7 @@ public class CollectTest {
@Test
public void testEmptyList() throws Exception {
- DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+ DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG);
List<Long> collected = new Collect<Long>().run(dataset).execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb978b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
index 0167a5f..dc92c55 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.graph.asm.dataset;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -60,7 +59,7 @@ public class CountTest {
@Test
public void testEmptyList() throws Exception {
- DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+ DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), Types.LONG);
long count = new Count<Long>().run(dataset).execute();
[4/4] flink git commit: [hotfix] [build] Force delete corrupt jar
files from cache
Posted by se...@apache.org.
[hotfix] [build] Force delete corrupt jar files from cache
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3df78090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3df78090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3df78090
Branch: refs/heads/master
Commit: 3df7809027b75032aea0aadb53fd53fb13f90754
Parents: 22531a8
Author: Stephan Ewen <se...@apache.org>
Authored: Wed May 16 22:51:02 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu May 17 15:12:51 2018 +0200
----------------------------------------------------------------------
.travis.yml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3df78090/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 4acdeb0..cad9c87 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -112,7 +112,7 @@ before_install:
- "export PATH=$M2_HOME/bin:$PATH"
- "export MAVEN_OPTS=\"-Dorg.slf4j.simpleLogger.showDateTime=true -Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS\""
# just in case: clean up the .m2 home and remove invalid jar files
- - 'test ! -d $HOME/.m2/repository/ || find $HOME/.m2/repository/ -name "*.jar" -exec sh -c ''if ! zip -T {} >/dev/null ; then echo "deleting invalid file: {}"; rm {} ; fi'' \;'
+ - 'test ! -d $HOME/.m2/repository/ || find $HOME/.m2/repository/ -name "*.jar" -exec sh -c ''if ! zip -T {} >/dev/null ; then echo "deleting invalid file: {}"; rm -f {} ; fi'' \;'
# We run mvn and monitor its output. If there is no output for the specified number of seconds, we
# print the stack traces of all running Java processes.