You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/07/14 08:36:31 UTC

[flink] branch master updated (ca6cef9 -> 5aa1d96)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ca6cef9  [FLINK-18491][python] Extract the Beam specific coder classes into a separate Python module
     new 446bc8b  [hotfix][table-planner-blink] Fix digest for inline structured types with generics
     new c7beb3f  [hotfix][table-planner-blink] Fix row size estimation for structured types
     new 104775e  [FLINK-18586][table-common] Simplify the creation of explicit structured types
     new 5aa1d96  [hotfix][table-common] Update DataTypeUtils.transform for structured types

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/table/api/DataTypes.java | 53 +++++++++++++
 .../table/types/extraction/DataTypeExtractor.java  | 25 ++----
 .../table/types/extraction/ExtractionUtils.java    | 34 ++++-----
 .../flink/table/types/utils/DataTypeUtils.java     | 88 +++++++++++++++++-----
 .../apache/flink/table/types/DataTypesTest.java    | 27 ++++++-
 .../types/inference/TypeTransformationsTest.java   | 38 ++++++++++
 .../planner/plan/schema/StructuredRelDataType.java | 11 ++-
 .../planner/plan/metadata/FlinkRelMdSize.scala     |  2 +-
 8 files changed, 221 insertions(+), 57 deletions(-)


[flink] 01/04: [hotfix][table-planner-blink] Fix digest for inline structured types with generics

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 446bc8b003c86100a904c66bb90ef2bba73309d2
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 13 14:28:14 2020 +0200

    [hotfix][table-planner-blink] Fix digest for inline structured types with generics
---
 .../table/planner/plan/schema/StructuredRelDataType.java      | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
index 4593a28..614b310 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
@@ -35,6 +35,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * The {@link RelDataType} representation of a {@link StructuredType}.
@@ -98,9 +99,17 @@ public final class StructuredRelDataType extends ObjectSqlType {
 				sb.append(structuredType.asSerializableString());
 			}
 			// in case of inline structured type we are using a temporary identifier
+			// that includes both the implementation class plus its children for cases with classes
+			// that use generics
 			else {
 				sb.append(structuredType.asSummaryString());
-				if (structuredType.isNullable()) {
+				sb.append("(");
+				sb.append(
+					fieldList.stream()
+						.map(field -> field.getType().getFullTypeString())
+						.collect(Collectors.joining(", ")));
+				sb.append(")");
+				if (!structuredType.isNullable()) {
 					sb.append(" NOT NULL");
 				}
 			}


[flink] 04/04: [hotfix][table-common] Update DataTypeUtils.transform for structured types

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5aa1d96f0e846f09a883d5a0b414048576e3c001
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 13 15:47:52 2020 +0200

    [hotfix][table-common] Update DataTypeUtils.transform for structured types
---
 .../flink/table/types/utils/DataTypeUtils.java     | 88 +++++++++++++++++-----
 .../types/inference/TypeTransformationsTest.java   | 38 ++++++++++
 2 files changed, 109 insertions(+), 17 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
index 29bb6a7..e1bdaf6 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
@@ -39,13 +40,16 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -84,12 +88,17 @@ public final class DataTypeUtils {
 	}
 
 	/**
-	 * Transforms the given data type (can be nested) to a different data type using the given
-	 * transformations. The given transformations will be called in order.
+	 * Transforms the given data type to a different data type using the given transformations.
+	 *
+	 * <p>The transformations will be called in the given order. In case of constructed or composite
+	 * types, a transformation will be applied transitively to children first.
+	 *
+	 * <p>Both the {@link DataType#getLogicalType()} and {@link DataType#getConversionClass()} can be
+	 * transformed.
 	 *
 	 * @param typeToTransform data type to be transformed.
 	 * @param transformations the transformations to transform data type to another type.
-	 * @return the new data type,
+	 * @return the new data type
 	 */
 	public static DataType transform(DataType typeToTransform, TypeTransformation... transformations) {
 		Preconditions.checkArgument(transformations.length > 0, "transformations should not be empty.");
@@ -183,6 +192,12 @@ public final class DataTypeUtils {
 		}
 	}
 
+	/**
+	 * Transforms a {@link DataType}.
+	 *
+	 * <p>In case of constructed or composite types, a transformation will be applied transitively to
+	 * children first.
+	 */
 	private static class DataTypeTransformer implements DataTypeVisitor<DataType> {
 
 		private final TypeTransformation transformation;
@@ -198,9 +213,9 @@ public final class DataTypeUtils {
 
 		@Override
 		public DataType visit(CollectionDataType collectionDataType) {
-			DataType newElementType = collectionDataType.getElementDataType().accept(this);
-			LogicalType logicalType = collectionDataType.getLogicalType();
-			LogicalType newLogicalType;
+			final DataType newElementType = collectionDataType.getElementDataType().accept(this);
+			final LogicalType logicalType = collectionDataType.getLogicalType();
+			final LogicalType newLogicalType;
 			if (logicalType instanceof ArrayType) {
 				newLogicalType = new ArrayType(
 					logicalType.isNullable(),
@@ -217,37 +232,60 @@ public final class DataTypeUtils {
 
 		@Override
 		public DataType visit(FieldsDataType fieldsDataType) {
-			final List<DataType> newFields = fieldsDataType.getChildren().stream()
+			final List<DataType> newDataTypes = fieldsDataType.getChildren().stream()
 				.map(dt -> dt.accept(this))
 				.collect(Collectors.toList());
 
 			final LogicalType logicalType = fieldsDataType.getLogicalType();
 			final LogicalType newLogicalType;
 			if (logicalType instanceof RowType) {
-				final List<RowType.RowField> oldFields = ((RowType) logicalType).getFields();
-				final List<RowType.RowField> newRowFields = IntStream.range(0, oldFields.size())
+				final List<RowField> oldFields = ((RowType) logicalType).getFields();
+				final List<RowField> newFields = IntStream.range(0, oldFields.size())
 					.mapToObj(i ->
-						new RowType.RowField(
+						new RowField(
 							oldFields.get(i).getName(),
-							newFields.get(i).getLogicalType(),
+							newDataTypes.get(i).getLogicalType(),
 							oldFields.get(i).getDescription().orElse(null)))
 					.collect(Collectors.toList());
 
 				newLogicalType = new RowType(
 					logicalType.isNullable(),
-					newRowFields);
+					newFields);
+			} else if (logicalType instanceof StructuredType) {
+				final StructuredType structuredType = (StructuredType) logicalType;
+				if (structuredType.getSuperType().isPresent()) {
+					throw new UnsupportedOperationException("Hierarchies of structured types are not supported yet.");
+				}
+				final List<StructuredAttribute> oldAttributes = structuredType.getAttributes();
+				final List<StructuredAttribute> newAttributes = IntStream.range(0, oldAttributes.size())
+					.mapToObj(i ->
+						new StructuredAttribute(
+							oldAttributes.get(i).getName(),
+							newDataTypes.get(i).getLogicalType(),
+							oldAttributes.get(i).getDescription().orElse(null)))
+					.collect(Collectors.toList());
+
+				final StructuredType.Builder builder = createStructuredBuilder(structuredType);
+				builder.attributes(newAttributes);
+				builder.setNullable(structuredType.isNullable());
+				builder.setFinal(structuredType.isFinal());
+				builder.setInstantiable(structuredType.isInstantiable());
+				builder.comparision(structuredType.getComparision());
+				structuredType.getDescription().ifPresent(builder::description);
+
+				newLogicalType = builder.build();
 			} else {
 				throw new UnsupportedOperationException("Unsupported logical type : " + logicalType);
 			}
-			return transformation.transform(new FieldsDataType(newLogicalType, newFields));
+			return transformation.transform(new FieldsDataType(newLogicalType, newDataTypes));
 		}
 
 		@Override
 		public DataType visit(KeyValueDataType keyValueDataType) {
-			DataType newKeyType = keyValueDataType.getKeyDataType().accept(this);
-			DataType newValueType = keyValueDataType.getValueDataType().accept(this);
-			LogicalType logicalType = keyValueDataType.getLogicalType();
-			LogicalType newLogicalType;
+			final DataType newKeyType = keyValueDataType.getKeyDataType().accept(this);
+			final DataType newValueType = keyValueDataType.getValueDataType().accept(this);
+			final LogicalType logicalType = keyValueDataType.getLogicalType();
+			final LogicalType newLogicalType;
 			if (logicalType instanceof MapType) {
 				newLogicalType = new MapType(
 					logicalType.isNullable(),
@@ -258,6 +296,22 @@ public final class DataTypeUtils {
 			}
 			return transformation.transform(new KeyValueDataType(newLogicalType, newKeyType, newValueType));
 		}
+
+		// ----------------------------------------------------------------------------------------
+
+		private StructuredType.Builder createStructuredBuilder(StructuredType structuredType) {
+			final Optional<ObjectIdentifier> identifier = structuredType.getObjectIdentifier();
+			final Optional<Class<?>> implementationClass = structuredType.getImplementationClass();
+			if (identifier.isPresent() && implementationClass.isPresent()) {
+				return StructuredType.newBuilder(identifier.get(), implementationClass.get());
+			} else if (identifier.isPresent()) {
+				return StructuredType.newBuilder(identifier.get());
+			} else if (implementationClass.isPresent()) {
+				return StructuredType.newBuilder(implementationClass.get());
+			} else {
+				throw new IllegalArgumentException("Invalid structured type.");
+			}
+		}
 	}
 
 	private static TableSchema expandCompositeType(FieldsDataType dataType) {
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
index 64c0342..ff33d5e 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/TypeTransformationsTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -32,6 +34,7 @@ import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 
+import static org.apache.flink.table.types.inference.TypeTransformations.TO_INTERNAL_CLASS;
 import static org.apache.flink.table.types.inference.TypeTransformations.legacyDecimalToDefaultDecimal;
 import static org.apache.flink.table.types.inference.TypeTransformations.legacyRawToTypeInfoRaw;
 import static org.apache.flink.table.types.inference.TypeTransformations.timeToSqlTypes;
@@ -44,6 +47,22 @@ import static org.junit.Assert.assertEquals;
 public class TypeTransformationsTest {
 
 	@Test
+	public void testToInternal() {
+		DataType dataType = DataTypes.STRUCTURED(
+			SimplePojo.class,
+			DataTypes.FIELD("name", DataTypes.STRING()),
+			DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(int.class)));
+
+		DataType expected = DataTypes.STRUCTURED(
+				SimplePojo.class,
+				DataTypes.FIELD("name", DataTypes.STRING().bridgedTo(StringData.class)),
+				DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(Integer.class)))
+			.bridgedTo(RowData.class);
+
+		assertEquals(expected, DataTypeUtils.transform(dataType, TO_INTERNAL_CLASS));
+	}
+
+	@Test
 	public void testTimeToSqlTypes() {
 		DataType dataType = DataTypes.ROW(
 			DataTypes.FIELD("a", DataTypes.STRING()),
@@ -130,6 +149,8 @@ public class TypeTransformationsTest {
 		assertEquals(expected, DataTypeUtils.transform(dataType, toNullable()));
 	}
 
+	// --------------------------------------------------------------------------------------------
+
 	private static DataType createLegacyDecimal() {
 		return TypeConversions.fromLegacyInfoToDataType(Types.BIG_DEC);
 	}
@@ -137,4 +158,21 @@ public class TypeTransformationsTest {
 	private static DataType createLegacyRaw() {
 		return TypeConversions.fromLegacyInfoToDataType(Types.GENERIC(TypeTransformationsTest.class));
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Simple POJO for testing.
+	 */
+	public static class SimplePojo {
+		public final String name;
+		public final int count;
+
+		public SimplePojo(String name, int count) {
+			this.name = name;
+			this.count = count;
+		}
+	}
 }


[flink] 02/04: [hotfix][table-planner-blink] Fix row size estimation for structured types

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7beb3f6f43e6e2dd969b916bf72bb698cf18a83
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 13 14:31:59 2020 +0200

    [hotfix][table-planner-blink] Fix row size estimation for structured types
---
 .../org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
index d3fa169..c4debe0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdSize.scala
@@ -390,7 +390,7 @@ object FlinkRelMdSize {
     BuiltInMethod.AVERAGE_ROW_SIZE.method)
 
   def averageTypeValueSize(t: RelDataType): JDouble = t.getSqlTypeName match {
-    case SqlTypeName.ROW =>
+    case SqlTypeName.ROW | SqlTypeName.STRUCTURED =>
       estimateRowSize(t)
     case SqlTypeName.ARRAY =>
       // 16 is an arbitrary estimate


[flink] 03/04: [FLINK-18586][table-common] Simplify the creation of explicit structured types

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 104775e1d0fe072da4b00851828af1a1df6cb8dd
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 13 15:17:50 2020 +0200

    [FLINK-18586][table-common] Simplify the creation of explicit structured types
    
    This closes #12887.
---
 .../java/org/apache/flink/table/api/DataTypes.java | 53 ++++++++++++++++++++++
 .../table/types/extraction/DataTypeExtractor.java  | 25 ++--------
 .../table/types/extraction/ExtractionUtils.java    | 34 +++++++-------
 .../apache/flink/table/types/DataTypesTest.java    | 27 ++++++++++-
 4 files changed, 101 insertions(+), 38 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
index 396e7d0..021d616 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.CollectionDataType;
@@ -52,6 +53,8 @@ import org.apache.flink.table.types.logical.RawType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.RowType.RowField;
 import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.logical.TimeType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
@@ -74,6 +77,8 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.types.extraction.ExtractionUtils.validateStructuredClass;
+
 /**
  * A {@link DataType} can be used to declare input and/or output types of operations. This class
  * enumerates all pre-defined data types of the Table & SQL API.
@@ -786,6 +791,54 @@ public final class DataTypes {
 		return new AtomicDataType(new TypeInformationRawType<>(typeInformation));
 	}
 
+	/**
+	 * Data type of a user-defined object structured type. Structured types contain zero, one or more
+	 * attributes. Each attribute consists of a name and a type. A type cannot be defined so that one of
+	 * its attribute types (transitively) uses itself.
+	 *
+	 * <p>There are two kinds of structured types. Types that are stored in a catalog and are identified
+	 * by an {@link ObjectIdentifier} or anonymously defined, unregistered types (usually reflectively
+	 * extracted) that are identified by an implementation {@link Class}.
+	 *
+	 * <p>This method helps in manually constructing anonymous, unregistered types. This is useful in
+	 * cases where the reflective extraction using {@link DataTypes#of(Class)} is not applicable. However,
+	 * {@link DataTypes#of(Class)} is the recommended way of creating inline structured types as it also
+	 * considers {@link DataTypeHint}s.
+	 *
+	 * <p>Structured types are converted to internal data structures by the runtime. The given implementation
+	 * class is only used at the edges of the table ecosystem (e.g. when bridging to a function or connector).
+	 * Serialization and equality ({@code hashCode/equals}) are handled by the runtime based on the logical
+	 * type. An implementation class must offer a default constructor with zero arguments or a full constructor
+	 * that assigns all attributes.
+	 *
+	 * <p>Note: A caller of this method must make sure that the {@link DataType#getConversionClass()} of the
+	 * given fields matches with the attributes of the given implementation class, otherwise an exception
+	 * might be thrown during runtime.
+	 *
+	 * @see DataTypes#of(Class)
+	 * @see StructuredType
+	 */
+	public static <T> DataType STRUCTURED(Class<T> implementationClass, Field... fields) {
+		// some basic validation of the class to prevent common mistakes
+		validateStructuredClass(implementationClass);
+
+		final StructuredType.Builder builder = StructuredType.newBuilder(implementationClass);
+		final List<StructuredAttribute> attributes = Stream.of(fields)
+			.map(f ->
+				new StructuredAttribute(
+					f.getName(),
+					f.getDataType().getLogicalType(),
+					f.getDescription().orElse(null)))
+			.collect(Collectors.toList());
+		builder.attributes(attributes);
+		builder.setFinal(true);
+		builder.setInstantiable(true);
+		final List<DataType> fieldDataTypes = Stream.of(fields)
+			.map(DataTypes.Field::getDataType)
+			.collect(Collectors.toList());
+		return new FieldsDataType(builder.build(), implementationClass, fieldDataTypes);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// Helper functions
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
index 3b20183..493b5d1 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java
@@ -23,10 +23,7 @@ import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.StructuredType;
-import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
 import org.apache.flink.table.types.utils.ClassDataTypeConverter;
 import org.apache.flink.types.Row;
 
@@ -515,20 +512,11 @@ public final class DataTypeExtractor {
 			type,
 			fields);
 
-		final List<StructuredAttribute> attributes = createStructuredTypeAttributes(
+		final DataTypes.Field[] attributes = createStructuredTypeAttributes(
 			constructor,
 			fieldDataTypes);
 
-		final StructuredType.Builder builder = StructuredType.newBuilder(clazz);
-		builder.attributes(attributes);
-		builder.setFinal(true); // anonymous structured types should not allow inheritance
-		builder.setInstantiable(true);
-		return new FieldsDataType(
-			builder.build(),
-			clazz,
-			attributes.stream()
-				.map(a -> fieldDataTypes.get(a.getName()))
-				.collect(Collectors.toList()));
+		return DataTypes.STRUCTURED(clazz, attributes);
 	}
 
 	private Map<String, DataType> extractStructuredTypeFields(
@@ -560,7 +548,7 @@ public final class DataTypeExtractor {
 		return fieldDataTypes;
 	}
 
-	private List<StructuredAttribute> createStructuredTypeAttributes(
+	private DataTypes.Field[] createStructuredTypeAttributes(
 			ExtractionUtils.AssigningConstructor constructor,
 			Map<String, DataType> fieldDataTypes) {
 		return Optional.ofNullable(constructor)
@@ -572,11 +560,8 @@ public final class DataTypeExtractor {
 				// field order is sorted
 				return fieldDataTypes.keySet().stream().sorted();
 			})
-			.map(name -> {
-				final LogicalType logicalType = fieldDataTypes.get(name).getLogicalType();
-				return new StructuredAttribute(name, logicalType);
-			})
-			.collect(Collectors.toList());
+			.map(name -> DataTypes.FIELD(name, fieldDataTypes.get(name)))
+			.toArray(DataTypes.Field[]::new);
 	}
 
 	/**
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
index e814fb6..2ed11ff 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java
@@ -146,6 +146,23 @@ public final class ExtractionUtils {
 	}
 
 	/**
+	 * Validates the characteristics of a class for a {@link StructuredType} such as accessibility.
+	 */
+	public static void validateStructuredClass(Class<?> clazz) {
+		final int m = clazz.getModifiers();
+		if (Modifier.isAbstract(m)) {
+			throw extractionError("Class '%s' must not be abstract.", clazz.getName());
+		}
+		if (!Modifier.isPublic(m)) {
+			throw extractionError("Class '%s' is not public.", clazz.getName());
+		}
+		if (clazz.getEnclosingClass() != null &&
+				(clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) {
+			throw extractionError("Class '%s' is a not a static, globally accessible class.", clazz.getName());
+		}
+	}
+
+	/**
 	 * Returns the field of a structured type. The logic is as broad as possible to support
 	 * both Java and Scala in different flavors.
 	 */
@@ -434,23 +451,6 @@ public final class ExtractionUtils {
 	}
 
 	/**
-	 * Validates the characteristics of a class for a {@link StructuredType} such as accessibility.
-	 */
-	static void validateStructuredClass(Class<?> clazz) {
-		final int m = clazz.getModifiers();
-		if (Modifier.isAbstract(m)) {
-			throw extractionError("Class '%s' must not be abstract.", clazz.getName());
-		}
-		if (!Modifier.isPublic(m)) {
-			throw extractionError("Class '%s' is not public.", clazz.getName());
-		}
-		if (clazz.getEnclosingClass() != null &&
-				(clazz.getDeclaringClass() == null || !Modifier.isStatic(m))) {
-			throw extractionError("Class '%s' is a not a static, globally accessible class.", clazz.getName());
-		}
-	}
-
-	/**
 	 * Validates if a given type is not already contained in the type hierarchy of a structured type.
 	 *
 	 * <p>Otherwise this would lead to infinite data type extraction cycles.
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
index 072534b..6552b80 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java
@@ -357,7 +357,15 @@ public class DataTypesTest {
 				.forUnresolvedDataType(RAW(Object.class))
 				.expectUnresolvedString("[RAW('java.lang.Object', '?')]")
 				.lookupReturns(DataTypes.RAW(new GenericTypeInfo<>(Object.class)))
-				.expectResolvedDataType(DataTypes.RAW(new GenericTypeInfo<>(Object.class)))
+				.expectResolvedDataType(DataTypes.RAW(new GenericTypeInfo<>(Object.class))),
+
+			TestSpec
+				.forUnresolvedDataType(DataTypes.of(SimplePojo.class))
+				.expectResolvedDataType(
+					DataTypes.STRUCTURED(
+						SimplePojo.class,
+						DataTypes.FIELD("name", DataTypes.STRING()),
+						DataTypes.FIELD("count", DataTypes.INT().notNull().bridgedTo(int.class))))
 		);
 	}
 
@@ -475,4 +483,21 @@ public class DataTypesTest {
 			return abstractDataType.toString();
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Simple POJO for testing.
+	 */
+	public static class SimplePojo {
+		public final String name;
+		public final int count;
+
+		public SimplePojo(String name, int count) {
+			this.name = name;
+			this.count = count;
+		}
+	}
 }