You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/22 16:52:16 UTC

[1/2] flink git commit: [FLINK-5385] [core] Add a helper method to create Row object.

Repository: flink
Updated Branches:
  refs/heads/master fd392704c -> d163f8416


[FLINK-5385] [core] Add a helper method to create Row object.

This closes #3038


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61ae04fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61ae04fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61ae04fb

Branch: refs/heads/master
Commit: 61ae04fb5dcb0d29efe96f1f19ba35e5ab1e360e
Parents: fd39270
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Dec 22 14:23:57 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 22 16:00:54 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/types/Row.java   | 26 ++++++++++++++++++++
 .../java/org/apache/flink/types/RowTest.java    | 12 +++++++++
 2 files changed, 38 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61ae04fb/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
index 6825b71..f9a5add 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Row.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -113,4 +113,30 @@ public class Row implements Serializable{
 	public int hashCode() {
 		return Arrays.hashCode(fields);
 	}
+
+	/**
+	 * Creates a new Row and assigns the given values to the Row's fields.
+	 * This is more convenient than using the constructor.
+	 *
+	 * <p>For example:
+	 *
+	 * <pre>
+	 *     Row.of("hello", true, 1L);}
+	 * </pre>
+	 * instead of
+	 * <pre>
+	 *     Row row = new Row(3);
+	 *     row.setField(0, "hello");
+	 *     row.setField(1, true);
+	 *     row.setField(2, 1L);
+	 * </pre>
+	 *
+	 */
+	public static Row of(Object... values) {
+		Row row = new Row(values.length);
+		for (int i = 0; i < values.length; i++) {
+			row.setField(i, values[i]);
+		}
+		return row;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/61ae04fb/flink-core/src/test/java/org/apache/flink/types/RowTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
index 35ba32d..13a4d6a 100644
--- a/flink-core/src/test/java/org/apache/flink/types/RowTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
@@ -34,4 +34,16 @@ public class RowTest {
 
 		assertEquals("1,hello,null,(2,hi),hello world", row.toString());
 	}
+
+	@Test
+	public void testRowOf() {
+		Row row1 = Row.of(1, "hello", null, Tuple2.of(2L, "hi"), true);
+		Row row2 = new Row(5);
+		row2.setField(0, 1);
+		row2.setField(1, "hello");
+		row2.setField(2, null);
+		row2.setField(3, new Tuple2<>(2L, "hi"));
+		row2.setField(4, true);
+		assertEquals(row1, row2);
+	}
 }


[2/2] flink git commit: [FLINK-5348] [core] Add support for custom field names to RowTypeInfo.

Posted by fh...@apache.org.
[FLINK-5348] [core] Add support for custom field names to RowTypeInfo.

This closes #3020


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d163f841
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d163f841
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d163f841

Branch: refs/heads/master
Commit: d163f841610644b51749e9449a5a8be982384fb7
Parents: 61ae04f
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Fri Dec 16 17:24:01 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 22 16:07:53 2016 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/RowTypeInfo.java   | 148 +++++++++++++++++++
 .../api/java/typeutils/RowTypeInfoTest.java     |  85 ++++++++++-
 2 files changed, 232 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d163f841/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index 03cbe61..a1b348a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -19,7 +19,9 @@ package org.apache.flink.api.java.typeutils;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.RowComparator;
@@ -29,7 +31,13 @@ import org.apache.flink.types.Row;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -40,6 +48,20 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 
 	private static final long serialVersionUID = 9158518989896601963L;
 
+	private static final String REGEX_INT_FIELD = "[0-9]+";
+	private static final String REGEX_STR_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
+	private static final String REGEX_FIELD = REGEX_STR_FIELD + "|" + REGEX_INT_FIELD;
+	private static final String REGEX_NESTED_FIELDS = "(" + REGEX_FIELD + ")(\\.(.+))?";
+	private static final String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + "|\\"
+			+ ExpressionKeys.SELECT_ALL_CHAR + "|\\"
+			+ ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
+	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+	private static final Pattern PATTERN_INT_FIELD = Pattern.compile(REGEX_INT_FIELD);
+
+	// --------------------------------------------------------------------------------------------
+
 	protected final String[] fieldNames;
 	/** Temporary variable for directly passing orders to comparators. */
 	private boolean[] comparatorOrders = null;
@@ -54,6 +76,122 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 		}
 	}
 
+	public RowTypeInfo(TypeInformation<?>[] types, String[] fieldNames) {
+		super(Row.class, types);
+		checkNotNull(fieldNames, "FieldNames should not be null.");
+		checkArgument(
+			types.length == fieldNames.length,
+			"Number of field types and names is different.");
+		checkArgument(
+			!hasDuplicateFieldNames(fieldNames),
+			"Field names are not unique.");
+
+		this.fieldNames = Arrays.copyOf(fieldNames, fieldNames.length);
+	}
+
+	@Override
+	public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
+		Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+
+		if (!matcher.matches()) {
+			throw new InvalidFieldReferenceException(
+				"Invalid tuple field reference \"" + fieldExpression + "\".");
+		}
+
+		String field = matcher.group(0);
+
+		if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
+			(field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
+			// handle select all
+			int keyPosition = 0;
+			for (TypeInformation<?> fType : types) {
+				if (fType instanceof CompositeType) {
+					CompositeType<?> cType = (CompositeType<?>) fType;
+					cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result);
+					keyPosition += cType.getTotalFields() - 1;
+				} else {
+					result.add(new FlatFieldDescriptor(offset + keyPosition, fType));
+				}
+				keyPosition++;
+			}
+		} else {
+			field = matcher.group(1);
+
+			Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
+			int fieldIndex;
+			if (intFieldMatcher.matches()) {
+				// field expression is an integer
+				fieldIndex = Integer.valueOf(field);
+			} else {
+				fieldIndex = this.getFieldIndex(field);
+			}
+			// fetch the field type will throw exception if the index is illegal
+			TypeInformation<?> fieldType = this.getTypeAt(fieldIndex);
+			// compute the offset,
+			for (int i = 0; i < fieldIndex; i++) {
+				offset += this.getTypeAt(i).getTotalFields();
+			}
+
+			String tail = matcher.group(3);
+
+			if (tail == null) {
+				// expression hasn't nested field
+				if (fieldType instanceof CompositeType) {
+					((CompositeType) fieldType).getFlatFields("*", offset, result);
+				} else {
+					result.add(new FlatFieldDescriptor(offset, fieldType));
+				}
+			} else {
+				// expression has nested field
+				if (fieldType instanceof CompositeType) {
+					((CompositeType) fieldType).getFlatFields(tail, offset, result);
+				} else {
+					throw new InvalidFieldReferenceException(
+						"Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + ".");
+				}
+			}
+		}
+	}
+
+	@Override
+	public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
+		Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
+		if (!matcher.matches()) {
+			if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) ||
+				fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+				throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
+			} else {
+				throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\".");
+			}
+		}
+
+		String field = matcher.group(1);
+
+		Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
+		int fieldIndex;
+		if (intFieldMatcher.matches()) {
+			// field expression is an integer
+			fieldIndex = Integer.valueOf(field);
+		} else {
+			fieldIndex = this.getFieldIndex(field);
+		}
+		// fetch the field type will throw exception if the index is illegal
+		TypeInformation<X> fieldType = this.getTypeAt(fieldIndex);
+
+		String tail = matcher.group(3);
+		if (tail == null) {
+			// found the type
+			return fieldType;
+		} else {
+			if (fieldType instanceof CompositeType) {
+				return ((CompositeType<?>) fieldType).getTypeAt(tail);
+			} else {
+				throw new InvalidFieldReferenceException(
+					"Nested field expression \""+ tail + "\" not possible on atomic type "+fieldType+".");
+			}
+		}
+	}
+
 	@Override
 	public TypeComparator<Row> createComparator(
 		int[] logicalKeyFields,
@@ -129,6 +267,16 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 		return bld.toString();
 	}
 
+	private boolean hasDuplicateFieldNames(String[] fieldNames) {
+		HashSet<String> names = new HashSet<>();
+		for (String field : fieldNames) {
+			if (!names.add(field)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
 	private class RowTypeComparatorBuilder implements TypeComparatorBuilder<Row> {
 
 		private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>();

http://git-wip-us.apache.org/repos/asf/flink/blob/d163f841/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
index 8de7bf7..45f616c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
@@ -18,12 +18,95 @@
 package org.apache.flink.api.java.typeutils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 public class RowTypeInfoTest {
+	private static TypeInformation<?>[] typeList = new TypeInformation<?>[]{
+		BasicTypeInfo.INT_TYPE_INFO,
+		new RowTypeInfo(
+			BasicTypeInfo.SHORT_TYPE_INFO,
+			BasicTypeInfo.BIG_DEC_TYPE_INFO),
+		BasicTypeInfo.STRING_TYPE_INFO
+	};
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrongNumberOfFieldNames() {
+		new RowTypeInfo(typeList, new String[]{"int", "string"});
+		// number of field names should be equal to number of types, go fail
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testDuplicateCustomFieldNames() {
+		new RowTypeInfo(typeList, new String[]{"int", "string", "string"});
+		// field names should not be the same, go fail
+	}
+
+	@Test
+	public void testCustomFieldNames() {
+		String[] fieldNames = new String[]{"int", "row", "string"};
+		RowTypeInfo typeInfo1 = new RowTypeInfo(typeList, new String[]{"int", "row", "string"});
+		assertArrayEquals(new String[]{"int", "row", "string"}, typeInfo1.getFieldNames());
+
+		assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo1.getTypeAt("string"));
+		assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo1.getTypeAt(2));
+		assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo1.getTypeAt("row.0"));
+		assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, typeInfo1.getTypeAt("row.f1"));
+
+		// change the names in fieldNames
+		fieldNames[1] = "composite";
+		RowTypeInfo typeInfo2 = new RowTypeInfo(typeList, fieldNames);
+		// make sure the field names are deep copied
+		assertArrayEquals(new String[]{"int", "row", "string"}, typeInfo1.getFieldNames());
+		assertArrayEquals(new String[]{"int", "composite", "string"}, typeInfo2.getFieldNames());
+	}
+
+	@Test
+	public void testGetFlatFields() {
+		RowTypeInfo typeInfo1 = new RowTypeInfo(typeList, new String[]{"int", "row", "string"});
+		List<FlatFieldDescriptor> result = new ArrayList<>();
+		typeInfo1.getFlatFields("row.*", 0, result);
+		assertEquals(2, result.size());
+		assertEquals(
+			new FlatFieldDescriptor(1, BasicTypeInfo.SHORT_TYPE_INFO).toString(),
+			result.get(0).toString());
+		assertEquals(
+			new FlatFieldDescriptor(2, BasicTypeInfo.BIG_DEC_TYPE_INFO).toString(),
+			result.get(1).toString());
+
+		result.clear();
+		typeInfo1.getFlatFields("string", 0, result);
+		assertEquals(1, result.size());
+		assertEquals(
+			new FlatFieldDescriptor(3, BasicTypeInfo.STRING_TYPE_INFO).toString(),
+			result.get(0).toString());
+	}
+
+	@Test
+	public void testGetTypeAt() {
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			new RowTypeInfo(
+				BasicTypeInfo.SHORT_TYPE_INFO,
+				BasicTypeInfo.BIG_DEC_TYPE_INFO
+			),
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+
+		assertArrayEquals(new String[]{"f0", "f1", "f2"}, typeInfo.getFieldNames());
+
+		assertEquals(BasicTypeInfo.STRING_TYPE_INFO, typeInfo.getTypeAt("f2"));
+		assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, typeInfo.getTypeAt("f1.f0"));
+		assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, typeInfo.getTypeAt("f1.1"));
+	}
 
 	@Test
 	public void testRowTypeInfoEquality() {
@@ -59,7 +142,7 @@ public class RowTypeInfoTest {
 			BasicTypeInfo.INT_TYPE_INFO,
 			new RowTypeInfo(
 				BasicTypeInfo.SHORT_TYPE_INFO,
-			    BasicTypeInfo.BIG_DEC_TYPE_INFO
+				BasicTypeInfo.BIG_DEC_TYPE_INFO
 			),
 			BasicTypeInfo.STRING_TYPE_INFO);