You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/22 16:52:16 UTC
[1/2] flink git commit: [FLINK-5385] [core] Add a helper method to
create Row object.
Repository: flink
Updated Branches:
refs/heads/master fd392704c -> d163f8416
[FLINK-5385] [core] Add a helper method to create Row object.
This closes #3038
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61ae04fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61ae04fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61ae04fb
Branch: refs/heads/master
Commit: 61ae04fb5dcb0d29efe96f1f19ba35e5ab1e360e
Parents: fd39270
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Dec 22 14:23:57 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 22 16:00:54 2016 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/types/Row.java | 26 ++++++++++++++++++++
.../java/org/apache/flink/types/RowTest.java | 12 +++++++++
2 files changed, 38 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61ae04fb/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
index 6825b71..f9a5add 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Row.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -113,4 +113,30 @@ public class Row implements Serializable{
public int hashCode() {
return Arrays.hashCode(fields);
}
+
+ /**
+ * Creates a new Row and assigns the given values to the Row's fields.
+ * This is more convenient than using the constructor.
+ *
+ * <p>For example:
+ *
+ * <pre>
+ * Row.of("hello", true, 1L);}
+ * </pre>
+ * instead of
+ * <pre>
+ * Row row = new Row(3);
+ * row.setField(0, "hello");
+ * row.setField(1, true);
+ * row.setField(2, 1L);
+ * </pre>
+ *
+ */
+ public static Row of(Object... values) {
+ Row row = new Row(values.length);
+ for (int i = 0; i < values.length; i++) {
+ row.setField(i, values[i]);
+ }
+ return row;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/61ae04fb/flink-core/src/test/java/org/apache/flink/types/RowTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
index 35ba32d..13a4d6a 100644
--- a/flink-core/src/test/java/org/apache/flink/types/RowTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
@@ -34,4 +34,16 @@ public class RowTest {
assertEquals("1,hello,null,(2,hi),hello world", row.toString());
}
+
+ @Test
+ public void testRowOf() {
+ Row row1 = Row.of(1, "hello", null, Tuple2.of(2L, "hi"), true);
+ Row row2 = new Row(5);
+ row2.setField(0, 1);
+ row2.setField(1, "hello");
+ row2.setField(2, null);
+ row2.setField(3, new Tuple2<>(2L, "hi"));
+ row2.setField(4, true);
+ assertEquals(row1, row2);
+ }
}
[2/2] flink git commit: [FLINK-5348] [core] Add support for custom
field names to RowTypeInfo.
Posted by fh...@apache.org.
[FLINK-5348] [core] Add support for custom field names to RowTypeInfo.
This closes #3020
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d163f841
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d163f841
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d163f841
Branch: refs/heads/master
Commit: d163f841610644b51749e9449a5a8be982384fb7
Parents: 61ae04f
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Fri Dec 16 17:24:01 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 22 16:07:53 2016 +0100
----------------------------------------------------------------------
.../flink/api/java/typeutils/RowTypeInfo.java | 148 +++++++++++++++++++
.../api/java/typeutils/RowTypeInfoTest.java | 85 ++++++++++-
2 files changed, 232 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d163f841/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index 03cbe61..a1b348a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -19,7 +19,9 @@ package org.apache.flink.api.java.typeutils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.RowComparator;
@@ -29,7 +31,13 @@ import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -40,6 +48,20 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
private static final long serialVersionUID = 9158518989896601963L;
+ private static final String REGEX_INT_FIELD = "[0-9]+";
+ private static final String REGEX_STR_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
+ private static final String REGEX_FIELD = REGEX_STR_FIELD + "|" + REGEX_INT_FIELD;
+ private static final String REGEX_NESTED_FIELDS = "(" + REGEX_FIELD + ")(\\.(.+))?";
+ private static final String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + "|\\"
+ + ExpressionKeys.SELECT_ALL_CHAR + "|\\"
+ + ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+ private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
+ private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+ private static final Pattern PATTERN_INT_FIELD = Pattern.compile(REGEX_INT_FIELD);
+
+ // --------------------------------------------------------------------------------------------
+
protected final String[] fieldNames;
/** Temporary variable for directly passing orders to comparators. */
private boolean[] comparatorOrders = null;
@@ -54,6 +76,122 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
}
}
+ public RowTypeInfo(TypeInformation<?>[] types, String[] fieldNames) {
+ super(Row.class, types);
+ checkNotNull(fieldNames, "FieldNames should not be null.");
+ checkArgument(
+ types.length == fieldNames.length,
+ "Number of field types and names is different.");
+ checkArgument(
+ !hasDuplicateFieldNames(fieldNames),
+ "Field names are not unique.");
+
+ this.fieldNames = Arrays.copyOf(fieldNames, fieldNames.length);
+ }
+
+ @Override
+ public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
+ Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+
+ if (!matcher.matches()) {
+ throw new InvalidFieldReferenceException(
+ "Invalid tuple field reference \"" + fieldExpression + "\".");
+ }
+
+ String field = matcher.group(0);
+
+ if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
+ (field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
+ // handle select all
+ int keyPosition = 0;
+ for (TypeInformation<?> fType : types) {
+ if (fType instanceof CompositeType) {
+ CompositeType<?> cType = (CompositeType<?>) fType;
+ cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result);
+ keyPosition += cType.getTotalFields() - 1;
+ } else {
+ result.add(new FlatFieldDescriptor(offset + keyPosition, fType));
+ }
+ keyPosition++;
+ }
+ } else {
+ field = matcher.group(1);
+
+ Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
+ int fieldIndex;
+ if (intFieldMatcher.matches()) {
+ // field expression is an integer
+ fieldIndex = Integer.valueOf(field);
+ } else {
+ fieldIndex = this.getFieldIndex(field);
+ }
+ // fetch the field type will throw exception if the index is illegal
+ TypeInformation<?> fieldType = this.getTypeAt(fieldIndex);
+ // compute the offset,
+ for (int i = 0; i < fieldIndex; i++) {
+ offset += this.getTypeAt(i).getTotalFields();
+ }
+
+ String tail = matcher.group(3);
+
+ if (tail == null) {
+ // expression hasn't nested field
+ if (fieldType instanceof CompositeType) {
+ ((CompositeType) fieldType).getFlatFields("*", offset, result);
+ } else {
+ result.add(new FlatFieldDescriptor(offset, fieldType));
+ }
+ } else {
+ // expression has nested field
+ if (fieldType instanceof CompositeType) {
+ ((CompositeType) fieldType).getFlatFields(tail, offset, result);
+ } else {
+ throw new InvalidFieldReferenceException(
+ "Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + ".");
+ }
+ }
+ }
+ }
+
+ @Override
+ public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+ Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+ if (!matcher.matches()) {
+ if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) ||
+ fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+ throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+ } else {
+ throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\".");
+ }
+ }
+
+ String field = matcher.group(1);
+
+ Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
+ int fieldIndex;
+ if (intFieldMatcher.matches()) {
+ // field expression is an integer
+ fieldIndex = Integer.valueOf(field);
+ } else {
+ fieldIndex = this.getFieldIndex(field);
+ }
+ // fetch the field type will throw exception if the index is illegal
+ TypeInformation<X> fieldType = this.getTypeAt(fieldIndex);
+
+ String tail = matcher.group(3);
+ if (tail == null) {
+ // found the type
+ return fieldType;
+ } else {
+ if (fieldType instanceof CompositeType) {
+ return ((CompositeType<?>) fieldType).getTypeAt(tail);
+ } else {
+ throw new InvalidFieldReferenceException(
+ "Nested field expression \""+ tail + "\" not possible on atomic type "+fieldType+".");
+ }
+ }
+ }
+
@Override
public TypeComparator<Row> createComparator(
int[] logicalKeyFields,
@@ -129,6 +267,16 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
return bld.toString();
}
+ private boolean hasDuplicateFieldNames(String[] fieldNames) {
+ HashSet<String> names = new HashSet<>();
+ for (String field : fieldNames) {
+ if (!names.add(field)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private class RowTypeComparatorBuilder implements TypeComparatorBuilder<Row> {
private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>();
http://git-wip-us.apache.org/repos/asf/flink/blob/d163f841/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
index 8de7bf7..45f616c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
@@ -18,12 +18,95 @@
package org.apache.flink.api.java.typeutils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
public class RowTypeInfoTest {
+ private static TypeInformation<?>[] typeList = new TypeInformation<?>[]{
+ BasicTypeInfo.INT_TYPE_INFO,
+ new RowTypeInfo(
+ BasicTypeInfo.SHORT_TYPE_INFO,
+ BasicTypeInfo.BIG_DEC_TYPE_INFO),
+ BasicTypeInfo.STRING_TYPE_INFO
+ };
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongNumberOfFieldNames() {
+ new RowTypeInfo(typeList, new String[]{"int", "string"});
+ // number of field names should be equal to number of types, go fail
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDuplicateCustomFieldNames() {
+ new RowTypeInfo(typeList, new String[]{"int", "string", "string"});
+ // field names should not be the same, go fail
+ }
+
+ @Test
+ public void testCustomFieldNames() {
+ String[] fieldNames = new String[]{"int", "row", "string"};
+ RowTypeInfo typeInfo1 = new RowTypeInfo(typeList, new String[]{"int", "row", "string"});
+ assertArrayEquals(new String[]{"int", "row", "string"}, typeInfo1.getFieldNames());
+
+ assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo1.getTypeAt("string"));
+ assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo1.getTypeAt(2));
+ assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo1.getTypeAt("row.0"));
+ assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, typeInfo1.getTypeAt("row.f1"));
+
+ // change the names in fieldNames
+ fieldNames[1] = "composite";
+ RowTypeInfo typeInfo2 = new RowTypeInfo(typeList, fieldNames);
+ // make sure the field names are deep copied
+ assertArrayEquals(new String[]{"int", "row", "string"}, typeInfo1.getFieldNames());
+ assertArrayEquals(new String[]{"int", "composite", "string"}, typeInfo2.getFieldNames());
+ }
+
+ @Test
+ public void testGetFlatFields() {
+ RowTypeInfo typeInfo1 = new RowTypeInfo(typeList, new String[]{"int", "row", "string"});
+ List<FlatFieldDescriptor> result = new ArrayList<>();
+ typeInfo1.getFlatFields("row.*", 0, result);
+ assertEquals(2, result.size());
+ assertEquals(
+ new FlatFieldDescriptor(1, BasicTypeInfo.SHORT_TYPE_INFO).toString(),
+ result.get(0).toString());
+ assertEquals(
+ new FlatFieldDescriptor(2, BasicTypeInfo.BIG_DEC_TYPE_INFO).toString(),
+ result.get(1).toString());
+
+ result.clear();
+ typeInfo1.getFlatFields("string", 0, result);
+ assertEquals(1, result.size());
+ assertEquals(
+ new FlatFieldDescriptor(3, BasicTypeInfo.STRING_TYPE_INFO).toString(),
+ result.get(0).toString());
+ }
+
+ @Test
+ public void testGetTypeAt() {
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ new RowTypeInfo(
+ BasicTypeInfo.SHORT_TYPE_INFO,
+ BasicTypeInfo.BIG_DEC_TYPE_INFO
+ ),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+
+ assertArrayEquals(new String[]{"f0", "f1", "f2"}, typeInfo.getFieldNames());
+
+ assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo.getTypeAt("f2"));
+ assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo.getTypeAt("f1.f0"));
+ assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, typeInfo.getTypeAt("f1.1"));
+ }
@Test
public void testRowTypeInfoEquality() {
@@ -59,7 +142,7 @@ public class RowTypeInfoTest {
BasicTypeInfo.INT_TYPE_INFO,
new RowTypeInfo(
BasicTypeInfo.SHORT_TYPE_INFO,
- BasicTypeInfo.BIG_DEC_TYPE_INFO
+ BasicTypeInfo.BIG_DEC_TYPE_INFO
),
BasicTypeInfo.STRING_TYPE_INFO);