You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/22 04:21:44 UTC

[GitHub] HuangZhenQiu closed pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

HuangZhenQiu closed pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
URL: https://github.com/apache/flink/pull/6483
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index 20749c24242..fdf7bedfb26 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,7 @@ tmp
 build-target
 flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/avro/
 flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/
+flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/
 flink-runtime-web/web-dashboard/assets/fonts/
 flink-runtime-web/web-dashboard/node_modules/
 flink-runtime-web/web-dashboard/bower_components/
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 14cf647cd24..4177af72318 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -233,6 +233,16 @@ protected static String extractFileExtension(String fileName) {
 	 */
 	protected boolean enumerateNestedFiles = false;
 
+	/**
+	 * The flag to specify whether to skip file splits with wrong schema.
+	 */
+	protected boolean skipWrongSchemaFileSplit = false;
+
+	/**
+	 * The flag to specify whether to skip corrupted record.
+	 */
+	protected boolean skipCorruptedRecord = true;
+
 	/**
 	 * Files filter for determining what files/directories should be included.
 	 */
@@ -463,6 +473,14 @@ public void configure(Configuration parameters) {
 		if (!this.enumerateNestedFiles) {
 			this.enumerateNestedFiles = parameters.getBoolean(ENUMERATE_NESTED_FILES_FLAG, false);
 		}
+
+		if (!this.skipWrongSchemaFileSplit) {
+			this.skipWrongSchemaFileSplit = parameters.getBoolean(SKIP_WRONG_SCHEMA_SPLITS, false);
+		}
+
+		if (this.skipCorruptedRecord) {
+			this.skipCorruptedRecord = parameters.getBoolean(SKIP_CORRUPTED_RECORD, true);
+		}
 	}
 
 	/**
@@ -1077,4 +1095,14 @@ private void abortWait() {
 	 * The config parameter which defines whether input directories are recursively traversed.
 	 */
 	public static final String ENUMERATE_NESTED_FILES_FLAG = "recursive.file.enumeration";
+
+	/**
+	 * The config parameter which defines whether to skip file split with wrong schema.
+	 */
+	public static final String SKIP_WRONG_SCHEMA_SPLITS = "skip.splits.wrong.schema";
+
+	/**
+	 * The config parameter which defines whether to skip corrupted record.
+	 */
+	public static final String SKIP_CORRUPTED_RECORD = "skip.corrupted.record";
 }
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
new file mode 100644
index 00000000000..dab1899a1ce
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be implemented.
+ *
+ * <P>Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors.
+ *
+ * @param <E> The type of record to read.
+ */
+public abstract class ParquetInputFormat<E>
+	extends FileInputFormat<E>
+	implements CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class);
+
+	private transient Counter recordConsumed;
+
+	private transient MessageType expectedFileSchema;
+
+	private TypeInformation[] fieldTypes;
+
+	private String[] fieldNames;
+
+	private boolean skipThisSplit = false;
+
+	private transient ParquetRecordReader<Row> parquetRecordReader;
+
+
+	/**
+	 * Read parquet files with given parquet file schema.
+	 *
+	 * @param path The path of the file to read.
+	 * @param messageType schema of parquet file
+	 */
+
+	protected ParquetInputFormat(Path path, MessageType messageType) {
+		super(path);
+		this.expectedFileSchema = checkNotNull(messageType, "messageType");
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(expectedFileSchema);
+		this.fieldTypes = rowTypeInfo.getFieldTypes();
+		this.fieldNames = rowTypeInfo.getFieldNames();
+		// read whole parquet file as one file split
+		this.unsplittable = true;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		super.configure(parameters);
+		parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord);
+	}
+
+	public void selectFields(String[] fieldNames) {
+		checkNotNull(fieldNames, "fieldNames");
+		this.fieldNames = fieldNames;
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(expectedFileSchema);
+		TypeInformation[] selectFieldTypes = new TypeInformation[fieldNames.length];
+		for (int i = 0; i < fieldNames.length; i++) {
+			selectFieldTypes[i] = rowTypeInfo.getTypeAt(fieldNames[i]);
+		}
+		this.fieldTypes = selectFieldTypes;
+	}
+
+	@Override
+	public Tuple2<Long, Long> getCurrentState() {
+		return new Tuple2<>(this.parquetRecordReader.getCurrentBlock(),
+			this.parquetRecordReader.getRecordInCurrentBlock());
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+		InputFile inputFile =
+			HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration);
+		ParquetReadOptions options = ParquetReadOptions.builder().build();
+		ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
+		MessageType schema = fileReader.getFileMetaData().getSchema();
+		MessageType readSchema = getReadSchema(schema);
+
+		this.skipThisSplit = false;
+		this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema, FilterCompat.NOOP);
+		this.parquetRecordReader.initialize(fileReader, configuration);
+		if (this.recordConsumed == null) {
+			this.recordConsumed = getRuntimeContext().getMetricGroup().counter("parquet-records-consumed");
+		}
+
+		LOG.debug(String.format("Open ParquetInputFormat with FileInputSplit [%s]", split.getPath().toString()));
+		if (skipThisSplit) {
+			LOG.warn(String.format(
+				"Escaped the file split [%s] due to mismatch of file schema to expected result schema",
+				split.getPath().toString()));
+		}
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		try {
+			this.open(split);
+		} finally {
+			if (state.f0 != -1) {
+				// open and read util the record we were before
+				// the checkpoint and discard the values
+				parquetRecordReader.seek(state.f0, state.f1);
+			}
+		}
+	}
+
+	/**
+	 * Get field names of read result.
+	 *
+	 * @return field names array
+	 */
+	protected String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	/**
+	 * Get field types of read result.
+	 *
+	 * @return field types array
+	 */
+	protected TypeInformation[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	@Override
+	public void close() throws IOException {
+		if (parquetRecordReader != null) {
+			parquetRecordReader.close();
+		}
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if (skipThisSplit) {
+			return true;
+		}
+		return parquetRecordReader.reachEnd();
+	}
+
+	@Override
+	public E nextRecord(E e) throws IOException {
+		if (reachedEnd()) {
+			return null;
+		}
+
+		if (parquetRecordReader.hasNextRecord()) {
+			recordConsumed.inc();
+			return convert(parquetRecordReader.nextRecord());
+		}
+
+		LOG.warn(String.format("Try to read next record in the end of a split. This should not happen!"));
+		return null;
+	}
+
+	/**
+	 * This ParquetInputFormat read parquet record as Row by default. Sub classes of it can extend this method
+	 * to further convert row to other types, such as POJO, Map or Tuple.
+	 *
+	 * @param row row read from parquet file
+	 * @return E target result type
+	 */
+	protected abstract E convert(Row row);
+
+	private MessageType getReadSchema(MessageType schema) {
+		RowTypeInfo rootTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(schema);
+		List<Type> types = new ArrayList<>();
+		for (int i = 0; i < fieldNames.length; ++i) {
+			String readFieldName = fieldNames[i];
+			TypeInformation<?> readFieldType = fieldTypes[i];
+			if (rootTypeInfo.getFieldIndex(readFieldName) < 0) {
+				if (!skipWrongSchemaFileSplit) {
+					throw new IllegalArgumentException(readFieldName + " can not be found in parquet schema");
+				} else {
+					this.skipThisSplit = true;
+					return schema;
+				}
+			}
+
+			if (!readFieldType.equals(rootTypeInfo.getTypeAt(readFieldName))) {
+				if (!skipWrongSchemaFileSplit) {
+					throw new IllegalArgumentException(readFieldName + " can not be converted to " + readFieldType);
+				} else {
+					this.skipThisSplit = true;
+					return schema;
+				}
+			}
+			types.add(schema.getType(readFieldName));
+		}
+
+		return new MessageType(schema.getName(), types);
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java
new file mode 100644
index 00000000000..18ce0a65529
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.schema.MessageType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquets files and convert to {@link Map} type.
+ * It is mainly used to read complex data type with nested fields.
+ */
+public class ParquetMapInputFormat extends ParquetInputFormat<Map> {
+
+	public ParquetMapInputFormat(Path path, MessageType messageType) {
+		super(path, messageType);
+	}
+
+	@Override
+	protected Map convert(Row row) {
+		Map<String, Object> map = new HashMap<>();
+		convert(map, row, getFieldTypes(), getFieldNames());
+		return map;
+	}
+
+	@SuppressWarnings("unchecked")
+	private void convert(Map<String, Object> map, Row row, TypeInformation<?>[] fieldTypes, String[] fieldNames) {
+		for (int i = 0; i < fieldNames.length; i++) {
+			if (row.getField(i) != null) {
+				if (fieldTypes[i] instanceof BasicTypeInfo
+					|| fieldTypes[i] instanceof PrimitiveArrayTypeInfo
+					|| fieldTypes[i] instanceof BasicArrayTypeInfo) {
+					map.put(fieldNames[i], row.getField(i));
+				} else if (fieldTypes[i] instanceof RowTypeInfo) {
+					Map<String, Object> nestedRow = new HashMap<>();
+					RowTypeInfo nestedRowTypeInfo = (RowTypeInfo) fieldTypes[i];
+					convert(nestedRow, (Row) row.getField(i),
+						nestedRowTypeInfo.getFieldTypes(), nestedRowTypeInfo.getFieldNames());
+					map.put(fieldNames[i], nestedRow);
+				} else if (fieldTypes[i] instanceof MapTypeInfo) {
+					Map<String, Object> nestedMap = new HashMap<>();
+					MapTypeInfo mapTypeInfo = (MapTypeInfo) fieldTypes[i];
+					convert(nestedMap, (Map<String, Object>) row.getField(i), mapTypeInfo);
+					map.put(fieldNames[i], nestedMap);
+				} else if (fieldTypes[i] instanceof ObjectArrayTypeInfo) {
+					List<Object> nestedObjectList = new ArrayList<>();
+					ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) fieldTypes[i];
+					convert(nestedObjectList, (Row[]) row.getField(i), objectArrayTypeInfo);
+					map.put(fieldNames[i], nestedObjectList);
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private void convert(Map<String, Object> target, Map<String, Object> source, MapTypeInfo mapTypeInfo) {
+		TypeInformation valueTypeInfp = mapTypeInfo.getValueTypeInfo();
+
+		for (String key : source.keySet()) {
+			if (valueTypeInfp instanceof RowTypeInfo) {
+				Map<String, Object> nestedRow = new HashMap<>();
+				convert(nestedRow, (Row) source.get(key),
+					((RowTypeInfo) valueTypeInfp).getFieldTypes(), ((RowTypeInfo) valueTypeInfp).getFieldNames());
+				target.put(key, nestedRow);
+			} else if (valueTypeInfp instanceof MapTypeInfo) {
+				Map<String, Object> nestedMap = new HashMap<>();
+				convert(nestedMap, (Map<String, Object>) source.get(key), (MapTypeInfo) valueTypeInfp);
+				target.put(key, nestedMap);
+			} else if (valueTypeInfp instanceof ObjectArrayTypeInfo) {
+				List<Object> nestedObjectList = new ArrayList<>();
+				convert(nestedObjectList, (Object[]) source.get(key), (ObjectArrayTypeInfo) valueTypeInfp);
+				target.put(key, nestedObjectList);
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private void convert(List<Object> target, Object[] source, ObjectArrayTypeInfo objectArrayTypeInfo) {
+		TypeInformation<?> itemType = objectArrayTypeInfo.getComponentInfo();
+		for (int i = 0; i < source.length; i++) {
+			if (itemType instanceof RowTypeInfo) {
+				Map<String, Object> nestedRow = new HashMap<>();
+				convert(nestedRow, (Row) source[i],
+					((RowTypeInfo) itemType).getFieldTypes(), ((RowTypeInfo) itemType).getFieldNames());
+				target.add(nestedRow);
+			} else if (itemType instanceof MapTypeInfo) {
+				Map<String, Object> nestedMap = new HashMap<>();
+				MapTypeInfo mapTypeInfo = (MapTypeInfo) itemType;
+				convert(nestedMap, (Map<String, Object>) source[i], mapTypeInfo);
+				target.add(nestedMap);
+			} else if (itemType instanceof ObjectArrayTypeInfo) {
+				List<Object> nestedObjectList = new ArrayList<>();
+				convert(nestedObjectList, (Row[]) source[i], (ObjectArrayTypeInfo) itemType);
+				target.add(nestedObjectList);
+			}
+		}
+
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java
new file mode 100644
index 00000000000..944f72791c2
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquets files and convert to POJO type.
+ * It is mainly used to read simple data type without nested fields.
+ */
+public class ParquetPojoInputFormat<E> extends ParquetInputFormat<E> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ParquetPojoInputFormat.class);
+	private final Class<E> pojoTypeClass;
+	private final TypeSerializer<E> typeSerializer;
+	private transient Field[] pojoFields;
+
+	public ParquetPojoInputFormat(Path filePath, MessageType messageType, PojoTypeInfo<E> pojoTypeInfo) {
+		super(filePath, messageType);
+		this.pojoTypeClass = pojoTypeInfo.getTypeClass();
+		this.typeSerializer = pojoTypeInfo.createSerializer(new ExecutionConfig());
+		final Map<String, Field> fieldMap = new HashMap<>();
+		findAllFields(pojoTypeClass, fieldMap);
+		selectFields(fieldMap.keySet().toArray(new String[fieldMap.size()]));
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		pojoFields = new Field[getFieldNames().length];
+
+		final Map<String, Field> fieldMap = new HashMap<>();
+		findAllFields(pojoTypeClass, fieldMap);
+
+		for (int i = 0; i < getFieldNames().length; ++i) {
+			String fieldName = getFieldNames()[i];
+			pojoFields[i] = fieldMap.get(fieldName);
+
+			if (pojoFields[i] != null) {
+				pojoFields[i].setAccessible(true);
+			} else {
+				throw new RuntimeException(
+					String.format("There is no field called %s in %s", fieldName, pojoTypeClass.getName()));
+			}
+		}
+	}
+
+	private void findAllFields(Class<?> clazz, Map<String, Field> fieldMap) {
+
+		for (Field field : clazz.getDeclaredFields()) {
+			fieldMap.put(field.getName(), field);
+		}
+
+		if (clazz.getSuperclass() != null) {
+			findAllFields(clazz.getSuperclass(), fieldMap);
+		}
+	}
+
+	@Override
+	protected E convert(Row row) {
+		E result = typeSerializer.createInstance();
+		for (int i = 0; i < row.getArity(); ++i) {
+			try {
+				if (pojoFields[i].getType().isAssignableFrom(List.class)) {
+					pojoFields[i].set(result, Arrays.asList(row.getField(i)));
+				} else {
+					pojoFields[i].set(result, row.getField(i));
+				}
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException(
+					String.format("Parsed value could not be set in POJO field %s", getFieldNames()[i]));
+			}
+		}
+
+		return result;
+	}
+
+	/**
+	 * Extracts the {@link TypeInformation}s  from {@link PojoTypeInfo} according to the given field name.
+	 */
+	private static <E> TypeInformation<?>[] extractTypeInfos(PojoTypeInfo<E> pojoTypeInfo, String[] fieldNames) {
+		Preconditions.checkNotNull(pojoTypeInfo);
+		Preconditions.checkNotNull(fieldNames);
+		Preconditions.checkArgument(pojoTypeInfo.getArity() >= fieldNames.length);
+		TypeInformation<?>[] fieldTypes = new TypeInformation<?>[fieldNames.length];
+		for (int i = 0; i < fieldNames.length; ++i) {
+			String fieldName = fieldNames[i];
+			Preconditions.checkNotNull(fieldName, "The field can't be null");
+			int fieldPos = pojoTypeInfo.getFieldIndex(fieldName);
+			Preconditions.checkArgument(fieldPos >= 0,
+				String.format("Field %s is not a member of POJO type %s",
+					fieldName, pojoTypeInfo.getTypeClass().getName()));
+			fieldTypes[i] = pojoTypeInfo.getTypeAt(fieldPos);
+		}
+
+		return fieldTypes;
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
new file mode 100644
index 00000000000..f8cd28bfba8
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat<Row> implements ResultTypeQueryable<Row> {
+	private static final long serialVersionUID = 11L;
+	private static final Logger LOG = LoggerFactory.getLogger(ParquetRowInputFormat.class);
+	private boolean timeStampRewrite;
+	private RowTypeInfo returnType;
+	private int tsIndex;
+
+	public ParquetRowInputFormat(Path path, MessageType messageType) {
+		super(path, messageType);
+		this.returnType = new RowTypeInfo(getFieldTypes(), getFieldNames());
+		this.timeStampRewrite = false;
+	}
+
+	@Override
+	public TypeInformation<Row> getProducedType() {
+		return new RowTypeInfo(getFieldTypes(), getFieldNames());
+	}
+
+	@Override
+	protected Row convert(Row row) {
+		if (timeStampRewrite) {
+			row.setField(tsIndex, new Timestamp((long) row.getField(tsIndex)));
+		}
+		return row;
+	}
+
+	/**
+	 * Convert long or double field in parquet schema to SqlTimeTypeInfo.TIMESTAMP, so that the row return can
+	 * be directly used for window aggregation in table API. Rewrite the time stamp field needs to come with
+	 * overriding the convert function.
+
+	 * @param fieldName the field needs to change to TIMESTAMP type
+	 */
+	public void rewriteTimeStampField(String fieldName) {
+		this.tsIndex = returnType.getFieldIndex(fieldName);
+		if (tsIndex == -1) {
+			throw new RuntimeException(String.format("Fail to extract timestamp field for row schema: [%s]",
+				returnType.toString()));
+		}
+
+		this.returnType.getFieldTypes()[tsIndex] = SqlTimeTypeInfo.TIMESTAMP;
+		this.timeStampRewrite = true;
+		LOG.debug("Read parquet record as row type: {}", returnType.toString());
+	}
+
+	public boolean isTimeStampRewrite() {
+		return timeStampRewrite;
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java
new file mode 100644
index 00000000000..428fd3913c9
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+/**
+ * Interface for {@link RowConverter} for extracting nested value from parquet record.
+ */
+public interface ParentDataHolder {
+
+	void add(int fieldIndex, Object object);
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
new file mode 100644
index 00000000000..46d5e9f8d6b
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.CheckReturnValue;
+import javax.annotation.meta.When;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position.
+ */
+public class ParquetRecordReader<T> {
+	private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class);
+
+	private ColumnIOFactory columnIOFactory;
+	private Filter filter;
+	private MessageType readSchema;
+	private MessageType fileSchema;
+	private ReadSupport<T> readSupport;
+
+	// Parquet Materializer convert record to T
+	private RecordMaterializer<T> recordMaterializer;
+	private T currentValue;
+	private long total;
+	private long current = 0;
+	private int currentBlock = -1;
+	private ParquetFileReader reader;
+	private RecordReader<T> recordReader;
+	private boolean strictTypeChecking = true;
+	private boolean skipCorruptedRecord = true;
+	private long countLoadUntilLastGroup = 0;
+	private long totalCountLoadedSoFar = 0;
+
+	public ParquetRecordReader(ReadSupport<T> readSupport, MessageType readSchema, Filter filter) {
+		this.filter = checkNotNull(filter, "readSupport");
+		this.readSupport = checkNotNull(readSupport, "readSchema");
+		this.readSchema = checkNotNull(readSchema, "filter");
+	}
+
+	public ParquetRecordReader(ReadSupport<T> readSupport, MessageType readSchema) {
+		this(readSupport, readSchema, FilterCompat.NOOP);
+	}
+
+	public void initialize(ParquetFileReader reader, Configuration configuration) {
+		this.reader = reader;
+		FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
+		// real schema of parquet file
+		this.fileSchema = parquetFileMetadata.getSchema();
+		Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
+		ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
+			configuration, toSetMultiMap(fileMetadata), readSchema));
+
+		this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+		this.recordMaterializer = readSupport.prepareForRead(
+			configuration, fileMetadata, readSchema, readContext);
+		this.total = reader.getRecordCount();
+	}
+
+	public void close() throws  IOException {
+		if (reader != null) {
+			reader.close();
+		}
+	}
+
+	public void setSkipCorruptedRecord(boolean skipCorruptedRecord) {
+		this.skipCorruptedRecord = skipCorruptedRecord;
+	}
+
+	@CheckReturnValue(when = When.NEVER)
+	public T nextRecord() {
+		return currentValue;
+	}
+
+	public void seek(long syncedBlock, long recordsReadSinceLastSync) throws IOException {
+		List<BlockMetaData> blockMetaData = reader.getRowGroups();
+		while (syncedBlock > 0) {
+			currentBlock++;
+			reader.skipNextRowGroup();
+			countLoadUntilLastGroup = totalCountLoadedSoFar;
+			totalCountLoadedSoFar += blockMetaData.get(currentBlock).getRowCount();
+			syncedBlock--;
+		}
+
+		for (int i = 0; i < recordsReadSinceLastSync; i++) {
+			// skip the record already processed
+			if (hasNextRecord()) {
+				nextRecord();
+			}
+		}
+
+	}
+
+	private RecordReader<T> createRecordReader(PageReadStore pages) throws IOException {
+		if (pages == null) {
+			throw new IOException("Expecting more rows but reached last block. Read " + current + " out of " + total);
+		}
+		MessageColumnIO columnIO = columnIOFactory.getColumnIO(readSchema, fileSchema, strictTypeChecking);
+		RecordReader<T> recordReader = columnIO.getRecordReader(pages, recordMaterializer, filter);
+		return recordReader;
+	}
+
+	public long getCurrentBlock() {
+		return currentBlock;
+	}
+
+	public long getRecordInCurrentBlock() {
+		if (currentBlock == 0) {
+			return current;
+		} else {
+			return current - countLoadUntilLastGroup;
+		}
+	}
+
+	public boolean reachEnd() {
+		return current >= total;
+	}
+
+	public boolean hasNextRecord() throws IOException {
+		boolean recordFound = false;
+		while (!recordFound) {
+			// no more records left
+			if (current >= total) {
+				return false;
+			}
+
+			try {
+				if (current == totalCountLoadedSoFar) {
+					PageReadStore pages = reader.readNextRowGroup();
+					recordReader = createRecordReader(pages);
+					countLoadUntilLastGroup = totalCountLoadedSoFar;
+					totalCountLoadedSoFar += pages.getRowCount();
+					currentBlock++;
+				}
+
+				current++;
+				try {
+					currentValue = recordReader.read();
+				} catch (RecordMaterializationException e) {
+					String errorMessage = String.format("skipping a corrupt record in block number [%d] record"
+						+ "number [%s] of file %s", currentBlock, current - countLoadUntilLastGroup, reader.getFile());
+
+					LOG.error(errorMessage);
+					if (!skipCorruptedRecord) {
+						throw new RuntimeException(errorMessage, e);
+					}
+					continue;
+				}
+
+				if (currentValue == null) {
+					current = totalCountLoadedSoFar;
+					LOG.debug("filtered record reader reached end of block");
+					continue;
+				}
+
+				recordFound = true;
+				LOG.debug("read value: {}", currentValue);
+			} catch (RuntimeException e) {
+				LOG.error(String.format("Can not read value at %d in block %d in file %s",
+					current - countLoadUntilLastGroup, currentBlock, reader.getFile()), e);
+				if (!skipCorruptedRecord) {
+					throw e;
+				}
+				return false;
+			}
+		}
+
+		return true;
+	}
+
+	private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
+		Map<K, Set<V>> setMultiMap = new HashMap<>();
+		for (Map.Entry<K, V> entry : map.entrySet()) {
+			Set<V> set = Collections.singleton(entry.getValue());
+			setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
+		}
+		return Collections.unmodifiableMap(setMultiMap);
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
new file mode 100644
index 00000000000..f7d67a059ce
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+	public static final String MAP_KEY = "key";
+	public static final String MAP_VALUE = "value";
+	public static final String LIST_ARRAY_TYPE = "array";
+	public static final String LIST_ELEMENT = "element";
+	public static final String LIST_GROUP_NAME = "list";
+	public static final String MESSAGE_ROOT = "root";
+	private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();
+
+	public static TypeInformation<?> fromParquetType(MessageType type) {
+		return convertFields(type.getFields());
+	}
+
+	/**
+	 * Converts Flink Internal Type to Parquet schema.
+	 *
+	 * @param typeInformation  flink type information
+	 * @param isStandard is standard LIST and MAP schema or back-compatible schema
+	 * @return Parquet schema
+	 */
+	public static MessageType toParquetType(TypeInformation<?> typeInformation, boolean isStandard) {
+		return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL, isStandard);
+	}
+
+	private static TypeInformation<?> convertFields(List<Type> parquetFields) {
+		List<TypeInformation<?>> types = new ArrayList<>();
+		List<String> names = new ArrayList<>();
+		for (Type field : parquetFields) {
+			TypeInformation<?> subType = convertField(field);
+			if (subType != null) {
+				types.add(subType);
+				names.add(field.getName());
+			}
+		}
+
+		return new RowTypeInfo(types.toArray(new TypeInformation<?>[types.size()]),
+			names.toArray(new String[names.size()]));
+	}
+
+	private static TypeInformation<?> convertField(final Type fieldType) {
+		TypeInformation<?> typeInfo = null;
+		if (fieldType.isPrimitive()) {
+			OriginalType originalType = fieldType.getOriginalType();
+			PrimitiveType primitiveType = fieldType.asPrimitiveType();
+			switch (primitiveType.getPrimitiveTypeName()) {
+				case BINARY:
+					if (originalType != null) {
+						switch (originalType) {
+							case DECIMAL:
+								typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
+								break;
+							case UTF8:
+							case ENUM:
+							case JSON:
+							case BSON:
+								typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+								break;
+							default:
+								throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+									+ " for primitive type BINARY");
+						}
+					} else {
+						typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+					}
+					break;
+				case BOOLEAN:
+					typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO;
+					break;
+				case INT32:
+					if (originalType != null) {
+						switch (originalType) {
+							case TIME_MICROS:
+							case TIME_MILLIS:
+								typeInfo = SqlTimeTypeInfo.TIME;
+								break;
+							case TIMESTAMP_MICROS:
+							case TIMESTAMP_MILLIS:
+								typeInfo = SqlTimeTypeInfo.TIMESTAMP;
+								break;
+							case DATE:
+								typeInfo = SqlTimeTypeInfo.DATE;
+								break;
+							case UINT_8:
+							case UINT_16:
+							case UINT_32:
+							case INT_8:
+							case INT_16:
+							case INT_32:
+								typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+								break;
+							default:
+								throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+									+ " for primitive type INT32");
+						}
+					} else {
+						typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+					}
+					break;
+				case INT64:
+					if (originalType != null) {
+						switch (originalType) {
+							case TIME_MICROS:
+								typeInfo = SqlTimeTypeInfo.TIME;
+								break;
+							case TIMESTAMP_MICROS:
+							case TIMESTAMP_MILLIS:
+								typeInfo = SqlTimeTypeInfo.TIMESTAMP;
+								break;
+							case INT_64:
+							case DECIMAL:
+								typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
+								break;
+							default:
+								throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+									+ " for primitive type INT64");
+						}
+					} else {
+						typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
+					}
+					break;
+				case INT96:
+					// It stores a timestamp type data, we read it as millisecond
+					typeInfo = SqlTimeTypeInfo.TIMESTAMP;
+					break;
+				case FLOAT:
+					typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO;
+					break;
+				case DOUBLE:
+					typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO;
+					break;
+				case FIXED_LEN_BYTE_ARRAY:
+					if (originalType != null) {
+						switch (originalType) {
+							case DECIMAL:
+								typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
+								break;
+							default:
+								throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+									+ " for primitive type FIXED_LEN_BYTE_ARRAY");
+						}
+					} else {
+						typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
+					}
+					break;
+				default:
+					throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
+			}
+		} else {
+			GroupType parquetGroupType = fieldType.asGroupType();
+			OriginalType originalType = parquetGroupType.getOriginalType();
+			if (originalType != null) {
+				switch (originalType) {
+					case LIST:
+						if (parquetGroupType.getFieldCount() != 1) {
+							throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
+						}
+						Type repeatedType = parquetGroupType.getType(0);
+						if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
+							throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
+						}
+
+						if (repeatedType.isPrimitive()) {
+							// Backward-compatibility element group doesn't exist also allowed
+							typeInfo = BasicArrayTypeInfo.getInfoFor(
+								Array.newInstance(convertField(repeatedType).getTypeClass(), 0).getClass());
+						} else {
+							// Backward-compatibility element group name can be any string (element/array/other)
+							GroupType elementType = repeatedType.asGroupType();
+							if (elementType.getFieldCount() > 1) {
+								typeInfo = ObjectArrayTypeInfo.getInfoFor(convertField(elementType));
+							} else {
+								Type internalType = elementType.getType(0);
+								if (internalType.isPrimitive()) {
+									typeInfo = BasicArrayTypeInfo.getInfoFor(
+										Array.newInstance(convertField(internalType).getTypeClass(),
+											0).getClass());
+								} else {
+									typeInfo = ObjectArrayTypeInfo.getInfoFor(convertField(internalType));
+								}
+							}
+						}
+						break;
+
+					case MAP_KEY_VALUE:
+					case MAP:
+						if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
+							throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
+						}
+
+						GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
+						if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
+							|| mapKeyValType.getFieldCount() != 2) {
+							throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
+						}
+						Type keyType = mapKeyValType.getType(0);
+						if (!keyType.isPrimitive()
+							|| !keyType.asPrimitiveType().getPrimitiveTypeName().equals(
+								PrimitiveType.PrimitiveTypeName.BINARY)
+							|| !keyType.getOriginalType().equals(OriginalType.UTF8)) {
+							throw new IllegalArgumentException("Map key type must be binary (UTF8): "
+								+ keyType);
+						}
+
+						Type valueType = mapKeyValType.getType(1);
+						return new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, convertField(valueType));
+					default:
+						throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
+				}
+			} else {
+				// if no original type than it is a record
+				return convertFields(parquetGroupType.getFields());
+			}
+		}
+
+		return typeInfo;
+	}
+
+	private static Type convertField(String fieldName, TypeInformation<?> typeInfo,
+									Type.Repetition inheritRepetition, boolean isStandard) {
+		Type fieldType = null;
+
+		Type.Repetition repetition = inheritRepetition == null ? Type.Repetition.OPTIONAL : inheritRepetition;
+		if (typeInfo.isBasicType()) {
+			BasicTypeInfo basicTypeInfo = (BasicTypeInfo) typeInfo;
+			if (basicTypeInfo.equals(BasicTypeInfo.BIG_DEC_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.BIG_INT_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.INT_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.CHAR_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition).named(fieldName);
+			} else if (basicTypeInfo.equals(BasicTypeInfo.DATE_TYPE_INFO)
+				|| basicTypeInfo.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
+				fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
+					.as(OriginalType.UTF8)
+					.named(fieldName);
+			}
+		} else if (typeInfo instanceof MapTypeInfo) {
+			MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+			fieldType = Types.optionalMap()
+				.key(convertField(MAP_KEY, mapTypeInfo.getKeyTypeInfo(), Type.Repetition.REQUIRED, isStandard))
+				.value(convertField(MAP_VALUE, mapTypeInfo.getValueTypeInfo(), Type.Repetition.OPTIONAL, isStandard))
+				.named(fieldName);
+		} else if (typeInfo instanceof ObjectArrayTypeInfo) {
+			ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;
+			fieldType = Types.optionalGroup()
+				.addField(convertField(LIST_ELEMENT, objectArrayTypeInfo.getComponentInfo(),
+					Type.Repetition.REPEATED, isStandard))
+				.as(OriginalType.LIST)
+				.named(fieldName);
+		} else if (typeInfo instanceof BasicArrayTypeInfo) {
+			BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo;
+
+			if (isStandard) {
+
+				// Add extra layer of Group according to Parquet's standard
+				Type listGroup = Types.repeatedGroup().addField(
+					convertField(LIST_ELEMENT, basicArrayType.getComponentInfo(),
+						Type.Repetition.OPTIONAL, isStandard)).named(LIST_GROUP_NAME);
+
+				fieldType = Types.optionalGroup()
+					.addField(listGroup)
+					.as(OriginalType.LIST).named(fieldName);
+			} else {
+				PrimitiveType primitiveTyp =
+					convertField(fieldName, basicArrayType.getComponentInfo(),
+						Type.Repetition.OPTIONAL, isStandard).asPrimitiveType();
+				fieldType = Types.optionalGroup()
+					.repeated(primitiveTyp.getPrimitiveTypeName())
+					.as(primitiveTyp.getOriginalType())
+					.named(LIST_ARRAY_TYPE)
+					.as(OriginalType.LIST).named(fieldName);
+			}
+		} else {
+			RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+			List<Type> types = new ArrayList<>();
+			String[] fieldNames = rowTypeInfo.getFieldNames();
+			TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
+			for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+				types.add(convertField(fieldNames[i], fieldTypes[i], Type.Repetition.OPTIONAL, isStandard));
+			}
+
+			if (fieldName == null) {
+				fieldType = new MessageType(MESSAGE_ROOT, types);
+			} else {
+				fieldType = new GroupType(repetition, fieldName, types);
+			}
+		}
+
+		return fieldType;
+	}
+
+	private boolean isElementType(Type repeatedType, String parentName) {
+		return (
+			// can't be a synthetic layer because it would be invalid
+			repeatedType.isPrimitive()
+				|| repeatedType.asGroupType().getFieldCount() > 1
+				|| repeatedType.asGroupType().getType(0).isRepetition(Type.Repetition.REPEATED)
+				// known patterns without the synthetic layer
+				|| repeatedType.getName().equals("array")
+				|| repeatedType.getName().equals(parentName + "_tuple"));
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java
new file mode 100644
index 00000000000..c58e28b2bdf
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.parquet.io.api.Binary;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class for decoding INT96 encoded parquet timestamp to timestamp millis in GMT.
+ * This class is equivalent of @see org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime,
+ * which produces less intermediate objects during decoding.
+ */
+public final class ParquetTimestampUtils {
+	private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
+	private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
+	private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
+
+	private ParquetTimestampUtils() {}
+
+	/**
+	 * Returns GMT timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos).
+	 *
+	 * @param timestampBinary INT96 parquet timestamp
+	 * @return timestamp in millis, GMT timezone
+	 */
+	public static long getTimestampMillis(Binary timestampBinary) {
+		if (timestampBinary.length() != 12) {
+			throw new IllegalArgumentException("Parquet timestamp must be 12 bytes, actual " + timestampBinary.length());
+		}
+		byte[] bytes = timestampBinary.getBytes();
+
+		// little endian encoding - need to invert byte order
+		long timeOfDayNanos = ByteBuffer.wrap(new byte[] {bytes[7], bytes[6], bytes[5], bytes[4],
+			bytes[3], bytes[2], bytes[1], bytes[0]}).getLong();
+		int julianDay = ByteBuffer.wrap(new byte[] {bytes[11], bytes[10], bytes[9], bytes[8]}).getInt();
+
+		return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
+	}
+
+	private static long julianDayToMillis(int julianDay) {
+		return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
new file mode 100644
index 00000000000..bdaa5b5635a
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row.
+ */
+public class RowConverter extends GroupConverter implements ParentDataHolder {
+	private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class);
+	private final Converter[] converters;
+	private final ParentDataHolder parentDataHolder;
+	private final TypeInformation<?> typeInfo;
+	private Row currentRow;
+	private int posInParentRow;
+
+	public RowConverter(MessageType messageType, TypeInformation<?> typeInfo) {
+		this(messageType, typeInfo, null, 0);
+	}
+
+	public RowConverter(GroupType schema, TypeInformation<?> typeInfo, ParentDataHolder parent, int pos) {
+		this.typeInfo = typeInfo;
+		this.parentDataHolder = parent;
+		this.posInParentRow = pos;
+		this.converters = new Converter[schema.getFieldCount()];
+
+		int i = 0;
+		if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) {
+			for (Type field : schema.getFields()) {
+				converters[i] = createConverter(field, i, ((CompositeType<?>) typeInfo).getTypeAt(i), this);
+				i++;
+			}
+		}
+	}
+
+	private static Converter createConverter(
+		Type field,
+		int fieldPos,
+		TypeInformation<?> typeInformation,
+		ParentDataHolder parentDataHolder) {
+		if (field.isPrimitive()) {
+			return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos);
+		} else if (typeInformation instanceof MapTypeInfo) {
+			return new RowConverter.MapConverter((GroupType) field, (MapTypeInfo) typeInformation,
+				parentDataHolder, fieldPos);
+		} else if (typeInformation instanceof BasicArrayTypeInfo) {
+			Type elementType = field.asGroupType().getFields().get(0);
+			Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass();
+			if (typeClass.equals(Character.class)) {
+				return new RowConverter.BasicArrayConverter<Character>((BasicArrayTypeInfo) typeInformation, elementType,
+					Character.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(Boolean.class)) {
+				return new RowConverter.BasicArrayConverter<Boolean>((BasicArrayTypeInfo) typeInformation, elementType,
+					Boolean.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(Short.class)) {
+				return new RowConverter.BasicArrayConverter<Short>((BasicArrayTypeInfo) typeInformation, elementType,
+					Short.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(Integer.class)) {
+				return new RowConverter.BasicArrayConverter<Integer>((BasicArrayTypeInfo) typeInformation, elementType,
+					Integer.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(Long.class)) {
+				return new RowConverter.BasicArrayConverter<Long>((BasicArrayTypeInfo) typeInformation, elementType,
+					Long.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(Double.class)) {
+				return new RowConverter.BasicArrayConverter<Double>((BasicArrayTypeInfo) typeInformation, elementType,
+					Double.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(String.class)) {
+				return new RowConverter.BasicArrayConverter<String>((BasicArrayTypeInfo) typeInformation, elementType,
+					String.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(Date.class)) {
+				return new RowConverter.BasicArrayConverter<Date>((BasicArrayTypeInfo) typeInformation, elementType,
+					Date.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(Time.class)) {
+				return new RowConverter.BasicArrayConverter<Time>((BasicArrayTypeInfo) typeInformation, elementType,
+					Time.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(Timestamp.class)) {
+				return new RowConverter.BasicArrayConverter<Timestamp>((BasicArrayTypeInfo) typeInformation,
+					elementType, Timestamp.class, parentDataHolder, fieldPos);
+			} else if (typeClass.equals(BigDecimal.class)) {
+				return new RowConverter.BasicArrayConverter<BigDecimal>((BasicArrayTypeInfo) typeInformation,
+					elementType, BigDecimal.class, parentDataHolder, fieldPos);
+			}
+
+			throw new IllegalArgumentException(
+				String.format("Can't create unsupported primitive array type for %s", typeClass.toString()));
+
+		} else if (typeInformation instanceof ObjectArrayTypeInfo) {
+			return new RowConverter.ObjectArrayConverter(field, (ObjectArrayTypeInfo) typeInformation,
+				parentDataHolder, fieldPos);
+		} else if (typeInformation instanceof RowTypeInfo) {
+			return new RowConverter((GroupType) field, typeInformation, parentDataHolder, fieldPos);
+		}
+
+		// Other are types, we don't support.
+		return null;
+	}
+
+	@Override
+	public Converter getConverter(int i) {
+		return converters[i];
+	}
+
+	@Override
+	public void start() {
+		this.currentRow = new Row(typeInfo.getArity());
+	}
+
+	public Row getCurrentRow() {
+		return currentRow;
+	}
+
+	@Override
+	public void end() {
+		if (parentDataHolder != null) {
+			parentDataHolder.add(posInParentRow, currentRow);
+		}
+	}
+
+	@Override
+	public void add(int fieldIndex, Object object) {
+		currentRow.setField(fieldIndex, object);
+	}
+
+	static class RowPrimitiveConverter extends PrimitiveConverter {
+		private Type dataType;
+		private OriginalType originalType;
+		private PrimitiveType.PrimitiveTypeName primitiveTypeName;
+		private ParentDataHolder parentDataHolder;
+		private int pos;
+
+		RowPrimitiveConverter(Type dataType, ParentDataHolder parentDataHolder, int pos) {
+			this.dataType = dataType;
+			this.parentDataHolder = parentDataHolder;
+			this.pos = pos;
+			if (dataType.isPrimitive()) {
+				this.originalType = dataType.getOriginalType();
+				this.primitiveTypeName = dataType.asPrimitiveType().getPrimitiveTypeName();
+			} else {
+				// Backward-compatibility  It can be a group type middle layer
+				Type primitiveType = dataType.asGroupType().getType(0);
+				this.originalType = primitiveType.getOriginalType();
+				this.primitiveTypeName = primitiveType.asPrimitiveType().getPrimitiveTypeName();
+			}
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			// in case it is a timestamp type stored as INT96
+			if (primitiveTypeName.equals(PrimitiveType.PrimitiveTypeName.INT96)) {
+				parentDataHolder.add(pos, new Timestamp(ParquetTimestampUtils.getTimestampMillis(value)));
+				return;
+			}
+
+			if (originalType != null) {
+				switch (originalType) {
+					case DECIMAL:
+						parentDataHolder.add(pos, new BigDecimal(value.toStringUsingUTF8().toCharArray()));
+						break;
+					case UTF8:
+					case ENUM:
+					case JSON:
+					case BSON:
+						parentDataHolder.add(pos, value.toStringUsingUTF8());
+						break;
+					default:
+						throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+							+ " for primitive type BINARY");
+				}
+			} else {
+				parentDataHolder.add(pos, value.toStringUsingUTF8());
+			}
+		}
+
+		@Override
+		public void addBoolean(boolean value) {
+			parentDataHolder.add(pos, value);
+		}
+
+		@Override
+		public void addDouble(double value) {
+			parentDataHolder.add(pos, value);
+		}
+
+		@Override
+		public void addFloat(float value) {
+			parentDataHolder.add(pos, value);
+		}
+
+		@Override
+		public void addInt(int value) {
+			if (originalType != null) {
+				switch (originalType) {
+					case TIME_MICROS:
+					case TIME_MILLIS:
+						parentDataHolder.add(pos, new Time(value));
+						break;
+					case TIMESTAMP_MICROS:
+					case TIMESTAMP_MILLIS:
+						parentDataHolder.add(pos, new Timestamp(value));
+						break;
+					case DATE:
+						parentDataHolder.add(pos, new Date(value));
+						break;
+					case UINT_8:
+					case UINT_16:
+					case UINT_32:
+					case INT_8:
+					case INT_16:
+					case INT_32:
+						parentDataHolder.add(pos, value);
+						break;
+					default:
+						throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+							+ " for primitive type INT32");
+				}
+			} else {
+				parentDataHolder.add(pos, value);
+			}
+		}
+
+		@Override
+		public void addLong(long value) {
+			if (originalType != null) {
+				switch (originalType) {
+					case TIME_MICROS:
+						parentDataHolder.add(pos, new Time(value));
+						break;
+					case TIMESTAMP_MICROS:
+					case TIMESTAMP_MILLIS:
+						parentDataHolder.add(pos, new Timestamp(value));
+						break;
+					case INT_64:
+					case DECIMAL:
+						// long is more efficient then BigDecimal in terms of memory.
+						parentDataHolder.add(pos, value);
+						break;
+					default:
+						throw new UnsupportedOperationException("Unsupported original type : " + originalType.name()
+							+ " for primitive type INT64");
+				}
+			} else {
+				parentDataHolder.add(pos, value);
+			}
+		}
+	}
+
+	static class ObjectArrayConverter<T extends RowTypeInfo> extends GroupConverter implements ParentDataHolder {
+		private final ParentDataHolder parentDataHolder;
+		private final ObjectArrayTypeInfo objectArrayTypeInfo;
+		private final Converter elementConverter;
+		private final Type type;
+		private final int pos;
+		private List<Row> list;
+
+		ObjectArrayConverter(Type type, ObjectArrayTypeInfo typeInfo, ParentDataHolder parentDataHolder, int pos) {
+			this.type = type;
+			this.parentDataHolder = parentDataHolder;
+			this.objectArrayTypeInfo = typeInfo;
+			this.pos = pos;
+			GroupType parquetGroupType = type.asGroupType();
+			Type elementType = parquetGroupType.getType(0);
+			this.elementConverter = createConverter(elementType, 0, objectArrayTypeInfo.getComponentInfo(), this);
+		}
+
+		@Override
+		public Converter getConverter(int fieldIndex) {
+			return elementConverter;
+		}
+
+		@Override
+		public void start() {
+			list = new ArrayList<>();
+		}
+
+		@Override
+		public void end() {
+			parentDataHolder.add(pos, list.toArray((Row[]) Array.newInstance(Row.class, list.size())));
+		}
+
+		@Override
+		public void add(int fieldIndex, Object object) {
+			list.add((Row) object);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	static class BasicArrayConverter<T> extends GroupConverter implements ParentDataHolder {
+		private final ParentDataHolder parentDataHolder;
+		private final BasicArrayTypeInfo typeInfo;
+		private final Type elementType;
+		private final Class elementClass;
+		private final int pos;
+		private List<T> list;
+		private Converter elementConverter;
+
+		BasicArrayConverter(BasicArrayTypeInfo typeInfo, Type elementType, Class primitiveClass,
+							ParentDataHolder parentDataHolder, int pos) {
+			this.typeInfo = typeInfo;
+			this.elementType = elementType;
+			this.elementClass = primitiveClass;
+			this.parentDataHolder = parentDataHolder;
+			this.pos = pos;
+			elementConverter = new RowConverter.RowPrimitiveConverter(elementType, this, 0);
+		}
+
+		@Override
+		public Converter getConverter(int fieldIndex) {
+			return elementConverter;
+		}
+
+		@Override
+		public void start() {
+			list = new ArrayList<>();
+		}
+
+		@Override
+		public void end() {
+			parentDataHolder.add(pos, list.toArray((T[]) Array.newInstance(elementClass, list.size())));
+		}
+
+		@Override
+		public void add(int fieldIndex, Object object) {
+			list.add((T) object);
+		}
+	}
+
+	static class MapConverter extends GroupConverter {
+		private final ParentDataHolder parentDataHolder;
+		private final Converter keyValueConverter;
+		private final MapTypeInfo typeInfo;
+		private final int pos;
+		private Map<Object, Object> map;
+
+		MapConverter(GroupType type, MapTypeInfo typeInfo, ParentDataHolder parentDataHolder, int pos) {
+			this.parentDataHolder = parentDataHolder;
+			this.typeInfo = typeInfo;
+			this.pos = pos;
+			this.keyValueConverter = new MapKeyValueConverter((GroupType) type.getType(0), typeInfo);
+		}
+
+		@Override
+		public Converter getConverter(int fieldIndex) {
+			return keyValueConverter;
+		}
+
+		@Override
+		public void start() {
+			map = new HashMap<>();
+		}
+
+		@Override
+		public void end() {
+			parentDataHolder.add(pos, map);
+		}
+
+		final class MapKeyValueConverter extends GroupConverter {
+			private final Converter keyConverter;
+			private final Converter valueConverter;
+			private Object key;
+			private Object value;
+
+			MapKeyValueConverter(GroupType groupType, MapTypeInfo typeInformation) {
+				this.keyConverter = createConverter(groupType.getType(0), 0,
+					typeInformation.getKeyTypeInfo(), new ParentDataHolder() {
+						@Override
+						public void add(int fieldIndex, Object object) {
+							key = object;
+						}
+					});
+
+				this.valueConverter = createConverter(groupType.getType(1), 1,
+					typeInformation.getValueTypeInfo(), new ParentDataHolder() {
+						@Override
+						public void add(int fieldIndex, Object object) {
+							value = object;
+						}
+					});
+			}
+
+			@Override
+			public Converter getConverter(int fieldIndex) {
+				if (fieldIndex == 0) {
+					return keyConverter;
+				} else {
+					return valueConverter;
+				}
+			}
+
+			@Override
+			public void start() {
+				key = null;
+				value = null;
+			}
+
+			@Override
+			public void end() {
+				map.put(this.key, this.value);
+			}
+		}
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowMaterializer.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowMaterializer.java
new file mode 100644
index 00000000000..5a2aa2bb8b5
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowMaterializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+/**
+ * Row materializer for {@link RowReadSupport}.
+ */
+public class RowMaterializer extends RecordMaterializer<Row> {
+	private RowConverter root;
+
+	public RowMaterializer(MessageType messageType, TypeInformation<?> rowTypeInfo) {
+		checkNotNull(messageType, "messageType");
+		checkNotNull(rowTypeInfo, "rowTypeInfo");
+		this.root = new RowConverter(messageType, rowTypeInfo);
+	}
+
+	@Override
+	public Row getCurrentRecord() {
+		return root.getCurrentRow();
+	}
+
+	@Override
+	public GroupConverter getRootConverter() {
+		return root;
+	}
+}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowReadSupport.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowReadSupport.java
new file mode 100644
index 00000000000..d87d58ef53a
--- /dev/null
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowReadSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Parquet {@link ReadSupport} implementation for reading Parquet record as {@link Row}.
+ */
+public class RowReadSupport extends ReadSupport<Row> {
+
+	private TypeInformation<?> returnTypeInfo;
+
+	@Override
+	public ReadContext init(InitContext initContext) {
+		checkNotNull(initContext, "initContext");
+		returnTypeInfo = ParquetSchemaConverter.fromParquetType(initContext.getFileSchema());
+		return new ReadContext(initContext.getFileSchema());
+	}
+
+	@Override
+	public RecordMaterializer<Row> prepareForRead(
+		Configuration configuration, Map<String, String> keyValueMetaData,
+		MessageType fileSchema, ReadContext readContext) {
+		return new RowMaterializer(fileSchema, returnTypeInfo);
+	}
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetInputFormatTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetInputFormatTest.java
new file mode 100644
index 00000000000..fc623ad2d4f
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetInputFormatTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.generated.SimpleRecord;
+import org.apache.flink.formats.parquet.pojo.PojoSimpleRecord;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Simple test case for reading {@link org.apache.flink.types.Row}, Map and Pojo from Parquet files.
+ */
+public class ParquetInputFormatTest {
+	private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();
+
+	@ClassRule
+	public static TemporaryFolder temp = new TemporaryFolder();
+
+	@Test
+	public void testReadRowFromSimpleRecord() throws IOException {
+		temp.create();
+		Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> simple = TestUtil.getSimpleRecordTestData();
+		Path path = TestUtil.createTempParquetFile(temp, TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
+		MessageType simpleType = SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+
+		ParquetRowInputFormat rowInputFormat = new ParquetRowInputFormat(path, simpleType);
+
+		RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
+		Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
+			.when(mockContext).getMetricGroup();
+		rowInputFormat.setRuntimeContext(mockContext);
+
+		FileInputSplit[] splits = rowInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowInputFormat.open(splits[0]);
+
+		Row row = rowInputFormat.nextRecord(null);
+		assertNotNull(row);
+		assertEquals(simple.f2, row);
+	}
+
+	@Test
+	public void testFailureRecoverySimpleRecord() throws IOException {
+		temp.create();
+		List<IndexedRecord> records = new ArrayList<>();
+		Long[] longArray = {1L};
+		for (long i = 0; i < 100; i++) {
+			final SimpleRecord simpleRecord = SimpleRecord.newBuilder()
+				.setBar("test_simple")
+				.setFoo(i)
+				.setArr(Arrays.asList(longArray)).build();
+			records.add(simpleRecord);
+		}
+
+		Path path = TestUtil.createTempParquetFile(temp, TestUtil.SIMPLE_SCHEMA, records);
+		MessageType simpleType = SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+
+		ParquetRowInputFormat rowInputFormat = new ParquetRowInputFormat(path, simpleType);
+
+		RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
+		Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
+			.when(mockContext).getMetricGroup();
+		rowInputFormat.setRuntimeContext(mockContext);
+
+		FileInputSplit[] splits = rowInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+
+		final Tuple2<Long, Long> checkpoint = new Tuple2<>();
+		checkpoint.f0 = 0L;
+		checkpoint.f1 = 51L;
+		rowInputFormat.reopen(splits[0], checkpoint);
+		Row row = rowInputFormat.nextRecord(null);
+		assertNotNull(row);
+		assertEquals(51L, row.getField(0));
+
+		for (int i = 0; i < 10; i++) {
+			rowInputFormat.nextRecord(null);
+		}
+
+		Tuple2<Long, Long> state = rowInputFormat.getCurrentState();
+		assertEquals(0L, state.f0.longValue());
+		assertEquals(62L, state.f1.longValue());
+	}
+
+	@Test
+	public void testReadRowFromNestedRecord() throws IOException {
+		temp.create();
+		Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> nested = TestUtil.getNestedRecordTestData();
+		Path path = TestUtil.createTempParquetFile(temp, TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
+		MessageType nestedType = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+
+		ParquetRowInputFormat rowInputFormat = new ParquetRowInputFormat(path, nestedType);
+		RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
+		Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
+			.when(mockContext).getMetricGroup();
+		rowInputFormat.setRuntimeContext(mockContext);
+
+		FileInputSplit[] splits = rowInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowInputFormat.open(splits[0]);
+
+		Row row = rowInputFormat.nextRecord(null);
+		assertNotNull(row);
+		assertEquals(7, row.getArity());
+
+		assertEquals(nested.f2.getField(0), row.getField(0));
+		assertEquals(nested.f2.getField(1), row.getField(1));
+		assertArrayEquals((Long[]) nested.f2.getField(3), (Long[]) row.getField(3));
+		assertArrayEquals((String[]) nested.f2.getField(4), (String[]) row.getField(4));
+		assertEquals(nested.f2.getField(5), row.getField(5));
+		assertArrayEquals((Row[]) nested.f2.getField(6), (Row[]) row.getField(6));
+	}
+
+	@Test
+	public void testReadPojoFromSimpleRecord() throws IOException, NoSuchFieldException {
+		temp.create();
+		Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> simple = TestUtil.getSimpleRecordTestData();
+		MessageType messageType = SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+		Path path = TestUtil.createTempParquetFile(temp, TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
+
+		List<PojoField> fieldList = new ArrayList<>();
+		fieldList.add(new PojoField(PojoSimpleRecord.class.getField("foo"), BasicTypeInfo.LONG_TYPE_INFO));
+		fieldList.add(new PojoField(PojoSimpleRecord.class.getField("bar"), BasicTypeInfo.STRING_TYPE_INFO));
+		fieldList.add(new PojoField(PojoSimpleRecord.class.getField("arr"),
+			BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO));
+
+		ParquetPojoInputFormat<PojoSimpleRecord> pojoInputFormat =
+			new ParquetPojoInputFormat<PojoSimpleRecord>(path, messageType, new PojoTypeInfo<PojoSimpleRecord>(
+				PojoSimpleRecord.class, fieldList));
+
+		RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
+		Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
+			.when(mockContext).getMetricGroup();
+		pojoInputFormat.setRuntimeContext(mockContext);
+
+		FileInputSplit[] splits = pojoInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		pojoInputFormat.open(splits[0]);
+
+		PojoSimpleRecord simpleRecord = pojoInputFormat.nextRecord(null);
+		assertEquals(simple.f2.getField(0), simpleRecord.getFoo());
+		assertEquals(simple.f2.getField(1), simpleRecord.getBar());
+		assertArrayEquals((Long[]) simple.f2.getField(2), simpleRecord.getArr());
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testReadMapFromNestedRecord() throws IOException {
+		temp.create();
+		Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> nested = TestUtil.getNestedRecordTestData();
+		Path path = TestUtil.createTempParquetFile(temp, TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
+		MessageType nestedType = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+		ParquetMapInputFormat mapInputFormat = new ParquetMapInputFormat(path, nestedType);
+
+		RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
+		Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
+			.when(mockContext).getMetricGroup();
+		mapInputFormat.setRuntimeContext(mockContext);
+		FileInputSplit[] splits = mapInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		mapInputFormat.open(splits[0]);
+
+		Map map = mapInputFormat.nextRecord(null);
+		assertNotNull(map);
+		assertArrayEquals((Long[]) map.get("arr"), (Long[]) nested.f2.getField(3));
+		assertArrayEquals((String[]) map.get("strArray"), (String[]) nested.f2.getField(4));
+
+		Map<String, String> mapItem = (Map<String, String>) ((Map) map.get("nestedMap")).get("mapItem");
+		assertEquals("map", mapItem.get("type"));
+		assertEquals("hashMap", mapItem.get("value"));
+
+		List<Map<String, String>> nestedArray = (List<Map<String, String>>) map.get("nestedArray");
+
+		assertEquals(1, nestedArray.size());
+		assertEquals("color", nestedArray.get(0).get("type"));
+		assertEquals("yellow", nestedArray.get(0).get("value"));
+	}
+
+	@Test
+	public void testSerialization() throws Exception {
+		temp.create();
+		Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> simple = TestUtil.getSimpleRecordTestData();
+		Path path = TestUtil.createTempParquetFile(temp, TestUtil.SIMPLE_SCHEMA, Collections.singletonList(simple.f1));
+		MessageType simpleType = SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
+
+		ParquetRowInputFormat rowInputFormat = new ParquetRowInputFormat(path, simpleType);
+		byte[] bytes = InstantiationUtil.serializeObject(rowInputFormat);
+		ParquetRowInputFormat copy = InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader());
+
+		RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
+		Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
+			.when(mockContext).getMetricGroup();
+		copy.setRuntimeContext(mockContext);
+
+		FileInputSplit[] splits = copy.createInputSplits(1);
+		assertEquals(1, splits.length);
+		copy.open(splits[0]);
+
+		Row row = copy.nextRecord(null);
+		assertNotNull(row);
+		assertEquals(simple.f2, row);
+	}
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java
deleted file mode 100644
index ca8f55faafd..00000000000
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java
+++ /dev/null
@@ -1,517 +0,0 @@
-/**
- * Autogenerated by Avro
- *
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.formats.parquet.generated;
-
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.message.BinaryMessageEncoder;
-import org.apache.avro.message.BinaryMessageDecoder;
-import org.apache.avro.message.SchemaStore;
-
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  private static final long serialVersionUID = -7342141701041388589L;
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"org.apache.flink.formats.parquet.generated\",\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"street\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-
-  private static SpecificData MODEL$ = new SpecificData();
-
-  private static final BinaryMessageEncoder<Address> ENCODER =
-      new BinaryMessageEncoder<Address>(MODEL$, SCHEMA$);
-
-  private static final BinaryMessageDecoder<Address> DECODER =
-      new BinaryMessageDecoder<Address>(MODEL$, SCHEMA$);
-
-  /**
-   * Return the BinaryMessageDecoder instance used by this class.
-   */
-  public static BinaryMessageDecoder<Address> getDecoder() {
-    return DECODER;
-  }
-
-  /**
-   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
-   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
-   */
-  public static BinaryMessageDecoder<Address> createDecoder(SchemaStore resolver) {
-    return new BinaryMessageDecoder<Address>(MODEL$, SCHEMA$, resolver);
-  }
-
-  /** Serializes this Address to a ByteBuffer. */
-  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
-    return ENCODER.encode(this);
-  }
-
-  /** Deserializes a Address from a ByteBuffer. */
-  public static Address fromByteBuffer(
-      java.nio.ByteBuffer b) throws java.io.IOException {
-    return DECODER.decode(b);
-  }
-
-  @Deprecated public int num;
-  @Deprecated public java.lang.CharSequence street;
-  @Deprecated public java.lang.CharSequence city;
-  @Deprecated public java.lang.CharSequence state;
-  @Deprecated public java.lang.CharSequence zip;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use <code>newBuilder()</code>.
-   */
-  public Address() {}
-
-  /**
-   * All-args constructor.
-   * @param num The new value for num
-   * @param street The new value for street
-   * @param city The new value for city
-   * @param state The new value for state
-   * @param zip The new value for zip
-   */
-  public Address(java.lang.Integer num, java.lang.CharSequence street, java.lang.CharSequence city, java.lang.CharSequence state, java.lang.CharSequence zip) {
-    this.num = num;
-    this.street = street;
-    this.city = city;
-    this.state = state;
-    this.zip = zip;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call.
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return num;
-    case 1: return street;
-    case 2: return city;
-    case 3: return state;
-    case 4: return zip;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  // Used by DatumReader.  Applications should not call.
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: num = (java.lang.Integer)value$; break;
-    case 1: street = (java.lang.CharSequence)value$; break;
-    case 2: city = (java.lang.CharSequence)value$; break;
-    case 3: state = (java.lang.CharSequence)value$; break;
-    case 4: zip = (java.lang.CharSequence)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'num' field.
-   * @return The value of the 'num' field.
-   */
-  public java.lang.Integer getNum() {
-    return num;
-  }
-
-  /**
-   * Sets the value of the 'num' field.
-   * @param value the value to set.
-   */
-  public void setNum(java.lang.Integer value) {
-    this.num = value;
-  }
-
-  /**
-   * Gets the value of the 'street' field.
-   * @return The value of the 'street' field.
-   */
-  public java.lang.CharSequence getStreet() {
-    return street;
-  }
-
-  /**
-   * Sets the value of the 'street' field.
-   * @param value the value to set.
-   */
-  public void setStreet(java.lang.CharSequence value) {
-    this.street = value;
-  }
-
-  /**
-   * Gets the value of the 'city' field.
-   * @return The value of the 'city' field.
-   */
-  public java.lang.CharSequence getCity() {
-    return city;
-  }
-
-  /**
-   * Sets the value of the 'city' field.
-   * @param value the value to set.
-   */
-  public void setCity(java.lang.CharSequence value) {
-    this.city = value;
-  }
-
-  /**
-   * Gets the value of the 'state' field.
-   * @return The value of the 'state' field.
-   */
-  public java.lang.CharSequence getState() {
-    return state;
-  }
-
-  /**
-   * Sets the value of the 'state' field.
-   * @param value the value to set.
-   */
-  public void setState(java.lang.CharSequence value) {
-    this.state = value;
-  }
-
-  /**
-   * Gets the value of the 'zip' field.
-   * @return The value of the 'zip' field.
-   */
-  public java.lang.CharSequence getZip() {
-    return zip;
-  }
-
-  /**
-   * Sets the value of the 'zip' field.
-   * @param value the value to set.
-   */
-  public void setZip(java.lang.CharSequence value) {
-    this.zip = value;
-  }
-
-  /**
-   * Creates a new Address RecordBuilder.
-   * @return A new Address RecordBuilder
-   */
-  public static org.apache.flink.formats.parquet.generated.Address.Builder newBuilder() {
-    return new org.apache.flink.formats.parquet.generated.Address.Builder();
-  }
-
-  /**
-   * Creates a new Address RecordBuilder by copying an existing Builder.
-   * @param other The existing builder to copy.
-   * @return A new Address RecordBuilder
-   */
-  public static org.apache.flink.formats.parquet.generated.Address.Builder newBuilder(org.apache.flink.formats.parquet.generated.Address.Builder other) {
-    return new org.apache.flink.formats.parquet.generated.Address.Builder(other);
-  }
-
-  /**
-   * Creates a new Address RecordBuilder by copying an existing Address instance.
-   * @param other The existing instance to copy.
-   * @return A new Address RecordBuilder
-   */
-  public static org.apache.flink.formats.parquet.generated.Address.Builder newBuilder(org.apache.flink.formats.parquet.generated.Address other) {
-    return new org.apache.flink.formats.parquet.generated.Address.Builder(other);
-  }
-
-  /**
-   * RecordBuilder for Address instances.
-   */
-  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Address>
-    implements org.apache.avro.data.RecordBuilder<Address> {
-
-    private int num;
-    private java.lang.CharSequence street;
-    private java.lang.CharSequence city;
-    private java.lang.CharSequence state;
-    private java.lang.CharSequence zip;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(SCHEMA$);
-    }
-
-    /**
-     * Creates a Builder by copying an existing Builder.
-     * @param other The existing Builder to copy.
-     */
-    private Builder(org.apache.flink.formats.parquet.generated.Address.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.num)) {
-        this.num = data().deepCopy(fields()[0].schema(), other.num);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.street)) {
-        this.street = data().deepCopy(fields()[1].schema(), other.street);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.city)) {
-        this.city = data().deepCopy(fields()[2].schema(), other.city);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.state)) {
-        this.state = data().deepCopy(fields()[3].schema(), other.state);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.zip)) {
-        this.zip = data().deepCopy(fields()[4].schema(), other.zip);
-        fieldSetFlags()[4] = true;
-      }
-    }
-
-    /**
-     * Creates a Builder by copying an existing Address instance
-     * @param other The existing instance to copy.
-     */
-    private Builder(org.apache.flink.formats.parquet.generated.Address other) {
-            super(SCHEMA$);
-      if (isValidValue(fields()[0], other.num)) {
-        this.num = data().deepCopy(fields()[0].schema(), other.num);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.street)) {
-        this.street = data().deepCopy(fields()[1].schema(), other.street);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.city)) {
-        this.city = data().deepCopy(fields()[2].schema(), other.city);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.state)) {
-        this.state = data().deepCopy(fields()[3].schema(), other.state);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.zip)) {
-        this.zip = data().deepCopy(fields()[4].schema(), other.zip);
-        fieldSetFlags()[4] = true;
-      }
-    }
-
-    /**
-      * Gets the value of the 'num' field.
-      * @return The value.
-      */
-    public java.lang.Integer getNum() {
-      return num;
-    }
-
-    /**
-      * Sets the value of the 'num' field.
-      * @param value The value of 'num'.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder setNum(int value) {
-      validate(fields()[0], value);
-      this.num = value;
-      fieldSetFlags()[0] = true;
-      return this;
-    }
-
-    /**
-      * Checks whether the 'num' field has been set.
-      * @return True if the 'num' field has been set, false otherwise.
-      */
-    public boolean hasNum() {
-      return fieldSetFlags()[0];
-    }
-
-
-    /**
-      * Clears the value of the 'num' field.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder clearNum() {
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /**
-      * Gets the value of the 'street' field.
-      * @return The value.
-      */
-    public java.lang.CharSequence getStreet() {
-      return street;
-    }
-
-    /**
-      * Sets the value of the 'street' field.
-      * @param value The value of 'street'.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder setStreet(java.lang.CharSequence value) {
-      validate(fields()[1], value);
-      this.street = value;
-      fieldSetFlags()[1] = true;
-      return this;
-    }
-
-    /**
-      * Checks whether the 'street' field has been set.
-      * @return True if the 'street' field has been set, false otherwise.
-      */
-    public boolean hasStreet() {
-      return fieldSetFlags()[1];
-    }
-
-
-    /**
-      * Clears the value of the 'street' field.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder clearStreet() {
-      street = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /**
-      * Gets the value of the 'city' field.
-      * @return The value.
-      */
-    public java.lang.CharSequence getCity() {
-      return city;
-    }
-
-    /**
-      * Sets the value of the 'city' field.
-      * @param value The value of 'city'.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder setCity(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.city = value;
-      fieldSetFlags()[2] = true;
-      return this;
-    }
-
-    /**
-      * Checks whether the 'city' field has been set.
-      * @return True if the 'city' field has been set, false otherwise.
-      */
-    public boolean hasCity() {
-      return fieldSetFlags()[2];
-    }
-
-
-    /**
-      * Clears the value of the 'city' field.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder clearCity() {
-      city = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    /**
-      * Gets the value of the 'state' field.
-      * @return The value.
-      */
-    public java.lang.CharSequence getState() {
-      return state;
-    }
-
-    /**
-      * Sets the value of the 'state' field.
-      * @param value The value of 'state'.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder setState(java.lang.CharSequence value) {
-      validate(fields()[3], value);
-      this.state = value;
-      fieldSetFlags()[3] = true;
-      return this;
-    }
-
-    /**
-      * Checks whether the 'state' field has been set.
-      * @return True if the 'state' field has been set, false otherwise.
-      */
-    public boolean hasState() {
-      return fieldSetFlags()[3];
-    }
-
-
-    /**
-      * Clears the value of the 'state' field.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder clearState() {
-      state = null;
-      fieldSetFlags()[3] = false;
-      return this;
-    }
-
-    /**
-      * Gets the value of the 'zip' field.
-      * @return The value.
-      */
-    public java.lang.CharSequence getZip() {
-      return zip;
-    }
-
-    /**
-      * Sets the value of the 'zip' field.
-      * @param value The value of 'zip'.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder setZip(java.lang.CharSequence value) {
-      validate(fields()[4], value);
-      this.zip = value;
-      fieldSetFlags()[4] = true;
-      return this;
-    }
-
-    /**
-      * Checks whether the 'zip' field has been set.
-      * @return True if the 'zip' field has been set, false otherwise.
-      */
-    public boolean hasZip() {
-      return fieldSetFlags()[4];
-    }
-
-
-    /**
-      * Clears the value of the 'zip' field.
-      * @return This builder.
-      */
-    public org.apache.flink.formats.parquet.generated.Address.Builder clearZip() {
-      zip = null;
-      fieldSetFlags()[4] = false;
-      return this;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public Address build() {
-      try {
-        Address record = new Address();
-        record.num = fieldSetFlags()[0] ? this.num : (java.lang.Integer) defaultValue(fields()[0]);
-        record.street = fieldSetFlags()[1] ? this.street : (java.lang.CharSequence) defaultValue(fields()[1]);
-        record.city = fieldSetFlags()[2] ? this.city : (java.lang.CharSequence) defaultValue(fields()[2]);
-        record.state = fieldSetFlags()[3] ? this.state : (java.lang.CharSequence) defaultValue(fields()[3]);
-        record.zip = fieldSetFlags()[4] ? this.zip : (java.lang.CharSequence) defaultValue(fields()[4]);
-        return record;
-      } catch (java.lang.Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private static final org.apache.avro.io.DatumWriter<Address>
-    WRITER$ = (org.apache.avro.io.DatumWriter<Address>)MODEL$.createDatumWriter(SCHEMA$);
-
-  @Override public void writeExternal(java.io.ObjectOutput out)
-    throws java.io.IOException {
-    WRITER$.write(this, SpecificData.getEncoder(out));
-  }
-
-  @SuppressWarnings("unchecked")
-  private static final org.apache.avro.io.DatumReader<Address>
-    READER$ = (org.apache.avro.io.DatumReader<Address>)MODEL$.createDatumReader(SCHEMA$);
-
-  @Override public void readExternal(java.io.ObjectInput in)
-    throws java.io.IOException {
-    READER$.read(this, SpecificData.getDecoder(in));
-  }
-
-}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/pojo/PojoSimpleRecord.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/pojo/PojoSimpleRecord.java
new file mode 100644
index 00000000000..e8d006016f5
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/pojo/PojoSimpleRecord.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.pojo;
+
+/**
+ * Simple POJO class for testing ParquetPojoInputFormat.
+ */
+public class PojoSimpleRecord {
+	public Long foo;
+	public String bar;
+	public Long[] arr;
+
+	public PojoSimpleRecord() { }
+
+	public Long getFoo() {
+		return foo;
+	}
+
+	public void setFoo(Long foo) {
+		this.foo = foo;
+	}
+
+	public String getBar() {
+		return bar;
+	}
+
+	public void setBar(String bar) {
+		this.bar = bar;
+	}
+
+	public Long[] getArr() {
+		return arr;
+	}
+
+	public void setArr(Long[] arr) {
+		this.arr = arr;
+	}
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
new file mode 100644
index 00000000000..eb2e305faf3
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Simple test case for reading parquet records.
+ */
+public class ParquetRecordReaderTest extends TestUtil {
+
+	@Test
+	public void testReadSimpleGroup() throws IOException {
+		temp.create();
+		Configuration configuration = new Configuration();
+
+		Long[] array = {1L};
+		GenericData.Record record = new GenericRecordBuilder(SIMPLE_SCHEMA)
+			.set("bar", "test")
+			.set("foo", 32L)
+			.set("arr", array).build();
+
+		Path path = createTempParquetFile(temp, SIMPLE_SCHEMA, Collections.singletonList(record));
+		MessageType readSchema = (new AvroSchemaConverter()).convert(SIMPLE_SCHEMA);
+		ParquetRecordReader<Row> rowReader = new ParquetRecordReader<Row>(new RowReadSupport(), readSchema);
+
+		InputFile inputFile =
+			HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION);
+		ParquetReadOptions options = ParquetReadOptions.builder().build();
+		ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
+
+		rowReader.initialize(fileReader, TEST_CONFIGURATION);
+		assertEquals(true, rowReader.hasNextRecord());
+
+		Row row = rowReader.nextRecord();
+		assertEquals(3, row.getArity());
+		assertEquals(32L, row.getField(0));
+		assertEquals("test", row.getField(1));
+		assertArrayEquals(array, (Long[]) row.getField(2));
+		assertEquals(true, rowReader.reachEnd());
+	}
+
+	@Test
+	public void testReadNestedGroup() throws IOException {
+		temp.create();
+		Configuration configuration = new Configuration();
+		Schema schema = unWrapSchema(NESTED_SCHEMA.getField("bar").schema());
+		GenericData.Record barRecord = new GenericRecordBuilder(schema)
+			.set("spam", 31L).build();
+
+		GenericData.Record record = new GenericRecordBuilder(NESTED_SCHEMA)
+			.set("foo", 32L)
+			.set("bar", barRecord)
+			.build();
+
+		Path path = createTempParquetFile(temp, NESTED_SCHEMA, Collections.singletonList(record));
+		MessageType readSchema = (new AvroSchemaConverter()).convert(NESTED_SCHEMA);
+		ParquetRecordReader<Row> rowReader = new ParquetRecordReader<Row>(new RowReadSupport(), readSchema);
+
+		InputFile inputFile =
+			HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION);
+		ParquetReadOptions options = ParquetReadOptions.builder().build();
+		ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
+
+		rowReader.initialize(fileReader, TEST_CONFIGURATION);
+		assertEquals(true, rowReader.hasNextRecord());
+
+		Row row = rowReader.nextRecord();
+		assertEquals(7, row.getArity());
+		assertEquals(32L, row.getField(0));
+		assertEquals(31L, ((Row) row.getField(2)).getField(0));
+		assertEquals(true, rowReader.reachEnd());
+	}
+
+	@Test
+	public void testMapGroup() throws IOException {
+		temp.create();
+		Configuration configuration = new Configuration();
+		Preconditions.checkState(unWrapSchema(NESTED_SCHEMA.getField("spamMap").schema())
+			.getType().equals(Schema.Type.MAP));
+		ImmutableMap.Builder<String, String> map = ImmutableMap.<String, String>builder();
+		map.put("testKey", "testValue");
+
+		GenericRecord record = new GenericRecordBuilder(NESTED_SCHEMA)
+			.set("foo", 32L)
+			.set("spamMap", map.build())
+			.build();
+
+		Path path = createTempParquetFile(temp, NESTED_SCHEMA, Collections.singletonList(record));
+		MessageType readSchema = (new AvroSchemaConverter()).convert(NESTED_SCHEMA);
+		ParquetRecordReader<Row> rowReader = new ParquetRecordReader<Row>(new RowReadSupport(), readSchema);
+
+		InputFile inputFile =
+			HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION);
+		ParquetReadOptions options = ParquetReadOptions.builder().build();
+		ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
+
+		rowReader.initialize(fileReader, TEST_CONFIGURATION);
+		assertEquals(true, rowReader.hasNextRecord());
+
+		Row row = rowReader.nextRecord();
+		assertEquals(7, row.getArity());
+
+		assertEquals(32L, row.getField(0));
+		Map<?, ?> result = (Map<?, ?>) row.getField(1);
+		assertEquals(result.get("testKey").toString(), "testValue");
+		assertEquals(true, rowReader.reachEnd());
+	}
+
+	@Test
+	public void testArrayGroup() throws IOException {
+		temp.create();
+		Schema arraySchema = unWrapSchema(NESTED_SCHEMA.getField("arr").schema());
+		Preconditions.checkState(arraySchema.getType().equals(Schema.Type.ARRAY));
+
+		List<Long> arrayData = new ArrayList<>();
+		arrayData.add(1L);
+		arrayData.add(1000L);
+
+		List<String> arrayString = new ArrayList<>();
+		arrayString.add("abcd");
+
+		@SuppressWarnings("unchecked")
+		GenericData.Array array = new GenericData.Array(arraySchema, arrayData);
+
+		GenericRecord record = new GenericRecordBuilder(NESTED_SCHEMA)
+			.set("foo", 32L)
+			.set("arr", array)
+			.set("strArray", arrayString)
+			.build();
+
+		Path path = createTempParquetFile(temp, NESTED_SCHEMA, Collections.singletonList(record));
+		MessageType readSchema = (new AvroSchemaConverter()).convert(NESTED_SCHEMA);
+		ParquetRecordReader<Row> rowReader = new ParquetRecordReader<Row>(new RowReadSupport(), readSchema);
+
+		InputFile inputFile =
+			HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION);
+		ParquetReadOptions options = ParquetReadOptions.builder().build();
+		ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
+
+		rowReader.initialize(fileReader, TEST_CONFIGURATION);
+		assertEquals(true, rowReader.hasNextRecord());
+
+		Row row = rowReader.nextRecord();
+		assertEquals(7, row.getArity());
+
+		assertEquals(32L, row.getField(0));
+		Long[] result = (Long[]) row.getField(3);
+		assertEquals(1L, result[0].longValue());
+		assertEquals(1000L, result[1].longValue());
+
+		String[] strResult = (String[]) row.getField(4);
+		assertEquals("abcd", strResult[0]);
+	}
+
+	@Test
+	public void testNestedMapGroup() throws IOException {
+		temp.create();
+		Schema nestedMapSchema = unWrapSchema(NESTED_SCHEMA.getField("nestedMap").schema());
+		Preconditions.checkState(nestedMapSchema.getType().equals(Schema.Type.MAP));
+
+		Schema mapValueSchema = nestedMapSchema.getValueType();
+		GenericRecord mapValue = new GenericRecordBuilder(mapValueSchema)
+			.set("type", "nested")
+			.set("value", "nested_value").build();
+
+		ImmutableMap.Builder<String, GenericRecord> map = ImmutableMap.<String, GenericRecord>builder();
+		map.put("testKey", mapValue);
+
+		GenericRecord record = new GenericRecordBuilder(NESTED_SCHEMA)
+			.set("nestedMap", map.build())
+			.set("foo", 34L).build();
+
+		Path path = createTempParquetFile(temp, NESTED_SCHEMA, Collections.singletonList(record));
+		MessageType readSchema = (new AvroSchemaConverter()).convert(NESTED_SCHEMA);
+		ParquetRecordReader<Row> rowReader = new ParquetRecordReader<Row>(new RowReadSupport(), readSchema);
+
+		InputFile inputFile =
+			HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION);
+		ParquetReadOptions options = ParquetReadOptions.builder().build();
+		ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
+
+		rowReader.initialize(fileReader, TEST_CONFIGURATION);
+		assertEquals(true, rowReader.hasNextRecord());
+
+		Row row = rowReader.nextRecord();
+		assertEquals(7, row.getArity());
+
+		assertEquals(34L, row.getField(0));
+		Map result = (Map) row.getField(5);
+
+		Row nestedRow = (Row) result.get("testKey");
+		assertEquals("nested", nestedRow.getField(0));
+		assertEquals("nested_value", nestedRow.getField(1));
+	}
+
+	@Test
+	public void testNestArrayGroup() throws IOException {
+		temp.create();
+		Schema nestedArraySchema = unWrapSchema(NESTED_SCHEMA.getField("nestedArray").schema());
+		Preconditions.checkState(nestedArraySchema.getType().equals(Schema.Type.ARRAY));
+
+		Schema arrayItemSchema = nestedArraySchema.getElementType();
+		GenericRecord item = new GenericRecordBuilder(arrayItemSchema)
+			.set("type", "nested")
+			.set("value", "nested_value").build();
+
+		ImmutableList.Builder<GenericRecord> list = ImmutableList.<GenericRecord>builder();
+		list.add(item);
+
+		GenericRecord record = new GenericRecordBuilder(NESTED_SCHEMA)
+			.set("nestedArray", list.build())
+			.set("foo", 34L).build();
+
+		Path path = createTempParquetFile(temp, NESTED_SCHEMA, Collections.singletonList(record));
+		MessageType readSchema = (new AvroSchemaConverter()).convert(NESTED_SCHEMA);
+		ParquetRecordReader<Row> rowReader = new ParquetRecordReader<Row>(new RowReadSupport(), readSchema);
+
+		InputFile inputFile =
+			HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(path.toUri()), TEST_CONFIGURATION);
+		ParquetReadOptions options = ParquetReadOptions.builder().build();
+		ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
+
+		rowReader.initialize(fileReader, TEST_CONFIGURATION);
+		assertEquals(true, rowReader.hasNextRecord());
+
+		Row row = rowReader.nextRecord();
+		assertEquals(7, row.getArity());
+
+		assertEquals(34L, row.getField(0));
+		Object[] result = (Object[]) row.getField(6);
+
+		assertEquals(1, result.length);
+
+		Row nestedRow = (Row) result[0];
+		assertEquals("nested", nestedRow.getField(0));
+		assertEquals("nested_value", nestedRow.getField(1));
+	}
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
new file mode 100644
index 00000000000..c6676838fe1
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.schema.MessageType;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Simple test case for conversion between Parquet schema and Flink date types.
+ */
+public class ParquetSchemaConverterTest extends TestUtil {
+	private final RowTypeInfo simplyRowType = new RowTypeInfo(
+		new TypeInformation[] {BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+			BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO},
+		new String[] {"foo", "bar", "arr"}
+	);
+
+	private final ObjectArrayTypeInfo nestedArray = ObjectArrayTypeInfo.getInfoFor(
+		new RowTypeInfo(
+			new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"type", "value"})
+	);
+
+	@SuppressWarnings("unchecked")
+	private final MapTypeInfo nestedMap = new MapTypeInfo(
+		BasicTypeInfo.STRING_TYPE_INFO,
+		new RowTypeInfo(
+			new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
+			new String[] {"type", "value"})
+	);
+
+	@SuppressWarnings("unchecked")
+	private final RowTypeInfo nestedRowType = new RowTypeInfo(
+		new TypeInformation[] {
+			BasicTypeInfo.LONG_TYPE_INFO,
+			new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+			new RowTypeInfo(
+				new TypeInformation[] {BasicTypeInfo.LONG_TYPE_INFO},
+				new String[] {"spam"}
+			),
+			BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
+			BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+			nestedMap,
+			nestedArray
+		},
+		new String[] {"foo", "spamMap", "bar", "arr", "strArray", "nestedMap", "nestedArray"}
+	);
+
+	@Test
+	public void testSimpleSchemaConversion() {
+		MessageType simpleType = new MessageType("simple", SIMPLE_STANDARD_TYPES);
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(simpleType);
+		assertEquals(simplyRowType, rowTypeInfo);
+	}
+
+	@Test
+	public void testNestedSchemaConversion() {
+		MessageType nestedTypes = new MessageType("nested", NESTED_TYPES);
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(nestedTypes);
+		assertEquals(nestedRowType, rowTypeInfo);
+	}
+
+	@Test
+	public void testSimpleRowTypeConversion() {
+		MessageType simpleSchema = ParquetSchemaConverter.toParquetType(simplyRowType, true);
+		assertEquals(Arrays.asList(SIMPLE_STANDARD_TYPES), simpleSchema.getFields());
+	}
+
+	@Test
+	public void testNestedRowTypeConversion() {
+		MessageType nestedSchema = ParquetSchemaConverter.toParquetType(nestedRowType, true);
+		assertEquals(Arrays.asList(NESTED_TYPES), nestedSchema.getFields());
+	}
+}
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
new file mode 100644
index 00000000000..033992e37c0
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.generated.ArrayItem;
+import org.apache.flink.formats.parquet.generated.Bar;
+import org.apache.flink.formats.parquet.generated.MapItem;
+import org.apache.flink.formats.parquet.generated.NestedRecord;
+import org.apache.flink.formats.parquet.generated.SimpleRecord;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.avro.Schema.Type.NULL;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Utilities for testing schema conversion and test parquet file creation.
+ */
+public class TestUtil {
+	@ClassRule
+	public static TemporaryFolder temp = new TemporaryFolder();
+	public static final Configuration TEST_CONFIGURATION = new Configuration();
+	public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc");
+	public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc");
+
+	protected static final Type[] SIMPLE_BACK_COMPTIBALE_TYPES = {
+		Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named("foo"),
+		Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL)
+			.as(OriginalType.UTF8).named("bar"),
+		Types.optionalGroup()
+			.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.REPEATED).named("array"))
+			.named("arr")
+	};
+
+	protected static final Type[] SIMPLE_STANDARD_TYPES = {
+		Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named("foo"),
+		Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL)
+			.as(OriginalType.UTF8).named("bar"),
+		Types.optionalGroup()
+			.addField(Types.repeatedGroup().addField(
+				Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL)
+					.named("element")).named("list")).as(OriginalType.LIST)
+			.named("arr")};
+
+	protected static final Type[] NESTED_TYPES = {
+		Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named("foo"),
+		Types.optionalMap()
+			.value(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL)
+				.as(OriginalType.UTF8).named("value"))
+			.named("spamMap"),
+		Types.optionalGroup().addField(
+			Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named("spam")).named("bar"),
+		Types.optionalGroup()
+			.addField(Types.repeatedGroup().addField(
+				Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL)
+				.named("element")).named("list")).as(OriginalType.LIST)
+			.named("arr"),
+		Types.optionalGroup()
+			.addField(Types.repeatedGroup().addField(
+				Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL).as(OriginalType.UTF8)
+				.named("element")).named("list")).as(OriginalType.LIST)
+			.named("strArray"),
+		Types.optionalMap().value(Types.optionalGroup()
+			.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL)
+				.as(OriginalType.UTF8).named("type"))
+			.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL)
+				.as(OriginalType.UTF8).named("value"))
+			.named("value"))
+			.named("nestedMap"),
+		Types.optionalGroup().addField(Types.repeatedGroup()
+			.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL)
+				.as(OriginalType.UTF8).named("type"))
+			.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL)
+				.as(OriginalType.UTF8).named("value"))
+			.named("element")).as(OriginalType.LIST)
+			.named("nestedArray")
+	};
+	private AvroSchemaConverter converter = new AvroSchemaConverter();
+
+	@Ignore
+	public void testSimpleSchemaConversion() {
+		MessageType simpleType = converter.convert(SIMPLE_SCHEMA);
+		assertEquals(SIMPLE_BACK_COMPTIBALE_TYPES.length, simpleType.getFieldCount());
+
+		for (int i = 0; i < simpleType.getFieldCount(); i++) {
+			assertEquals(SIMPLE_BACK_COMPTIBALE_TYPES[i], simpleType.getType(i));
+		}
+	}
+
+	/**
+	 * This test case is using the parquet native avro schema converter that convert
+	 * avro map value typed to required. But komodo write parquet files with map value
+	 * type as optional. Thus, the test case is meaningless. But keep it here for later
+	 * reference.
+	 */
+	@Ignore
+	public void testNestedSchemaConversion() {
+		MessageType nestedType = converter.convert(NESTED_SCHEMA);
+		assertEquals(NESTED_TYPES.length, nestedType.getFieldCount());
+
+		for (int i = 0; i < nestedType.getFieldCount(); i++) {
+			assertEquals(NESTED_TYPES[i], nestedType.getType(i));
+		}
+	}
+
+	public static Path createTempParquetFile(
+		TemporaryFolder temporaryFolder,
+		Schema schema,
+		List<IndexedRecord> records) throws IOException {
+		File root = temporaryFolder.getRoot();
+		Path path = new Path(root.getPath(), UUID.randomUUID().toString());
+		ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder(
+			new org.apache.hadoop.fs.Path(path.toUri())).withSchema(schema).build();
+
+		for (IndexedRecord record : records) {
+			writer.write(record);
+		}
+
+		writer.close();
+		return path;
+	}
+
+	private static Schema getTestSchema(String schemaName) {
+		try {
+			InputStream inputStream = TestUtil.class.getClassLoader()
+				.getResourceAsStream("avro/" + schemaName);
+			return new Schema.Parser().parse(inputStream);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	protected Schema unWrapSchema(Schema o) {
+		List<Schema> schemas = o.getTypes();
+		Preconditions.checkArgument(schemas.size() == 2, "Invalid union type");
+		return schemas.get(0).getType() == NULL ? schemas.get(1) : schemas.get(0);
+	}
+
+	public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSimpleRecordTestData() {
+		Long[] longArray = {1L};
+		final SimpleRecord simpleRecord = SimpleRecord.newBuilder()
+			.setBar("test_simple")
+			.setFoo(1L)
+			.setArr(Arrays.asList(longArray)).build();
+
+		final Row simpleRow = new Row(3);
+		simpleRow.setField(0, 1L);
+		simpleRow.setField(1, "test_simple");
+		simpleRow.setField(2, longArray);
+
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
+		t.f0 = SimpleRecord.class;
+		t.f1 = simpleRecord;
+		t.f2 = simpleRow;
+
+		return t;
+	}
+
+	public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getNestedRecordTestData() {
+		final Bar bar = Bar.newBuilder()
+			.setSpam(1L).build();
+
+		final ArrayItem arrayItem = ArrayItem.newBuilder()
+			.setType("color")
+			.setValue("yellow").build();
+
+		final MapItem mapItem = MapItem.newBuilder()
+			.setType("map")
+			.setValue("hashMap").build();
+
+		List<ArrayItem> nestedArray = new ArrayList<>();
+		nestedArray.add(arrayItem);
+
+		Map<CharSequence, MapItem> nestedMap = new HashMap<>();
+		nestedMap.put("mapItem", mapItem);
+
+		List<Long> longArray = new ArrayList<>();
+		longArray.add(1L);
+
+		List<CharSequence> stringArray = new ArrayList<>();
+		stringArray.add("String");
+
+		Long[] primitiveLongArray = {1L};
+		String[] primitiveStringArray = {"String"};
+
+		final NestedRecord nestedRecord = NestedRecord.newBuilder()
+			.setBar(bar)
+			.setNestedArray(nestedArray)
+			.setStrArray(stringArray)
+			.setNestedMap(nestedMap)
+			.setArr(longArray).build();
+
+		final Row barRow = new Row(1);
+		barRow.setField(0, 1L);
+
+		final Row arrayItemRow = new Row(2);
+		arrayItemRow.setField(0, "color");
+		arrayItemRow.setField(1, "yellow");
+
+		final Row mapItemRow = new Row(2);
+		mapItemRow.setField(0, "map");
+		mapItemRow.setField(1, "hashMap");
+
+		Row[] nestedRowArray = {arrayItemRow};
+		Map<String, Row> nestedRowMap = new HashMap<>();
+		nestedRowMap.put("mapItem", mapItemRow);
+
+		final Row nestedRow = new Row(7);
+		nestedRow.setField(2, barRow);
+		nestedRow.setField(3, primitiveLongArray);
+		nestedRow.setField(4, primitiveStringArray);
+		nestedRow.setField(5, nestedRowMap);
+		nestedRow.setField(6, nestedRowArray);
+
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
+		t.f0 = NestedRecord.class;
+		t.f1 = nestedRecord;
+		t.f2 = nestedRow;
+
+		return t;
+	}
+
+	public static void main(String[] args) throws IOException {
+		TestUtil testUtil = new TestUtil();
+		temp.create();
+
+		GenericData.Record record = new GenericRecordBuilder(SIMPLE_SCHEMA)
+			.set("bar", "test")
+			.set("foo", 32L).build();
+
+		Path path = testUtil.createTempParquetFile(temp, SIMPLE_SCHEMA, Collections.singletonList(record));
+		ParquetReader<Group> reader = ParquetReader.builder(
+			new GroupReadSupport(), new org.apache.hadoop.fs.Path(path.toUri())).withConf(new Configuration()).build();
+
+		Group group = reader.read();
+		while (group != null) {
+			System.out.println(group.toString());
+			group = reader.read();
+		}
+	}
+}
diff --git a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
new file mode 100644
index 00000000000..f40dcf2cb57
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
@@ -0,0 +1,35 @@
+{
+  "name": "NestedRecord",
+  "namespace": "org.apache.flink.formats.parquet.generated",
+  "type": "record",
+  "fields": [
+      { "default": null, "name": "foo", "type": [ "null", "long" ]},
+      { "default": null, "name": "spamMap", "type": [ "null", {"type": "map", "values": "string"} ]},
+      { "default": null, "name": "bar", "type": [ "null", {
+        "type": "record",
+        "name": "Bar",
+        "fields": [ { "default": null, "name": "spam", "type": [ "null", "long" ] }]
+        }]
+      },
+      { "default": null, "name": "arr", "type": [ "null", {"type": "array", "items": "long"} ]},
+      { "default": null, "name": "strArray", "type": [ "null", {"type": "array", "items": "string"} ]},
+      { "default": null, "name": "nestedMap", "type": [ "null", {"type": "map", "values": {
+        "type": "record",
+        "name": "MapItem",
+        "fields": [
+         {"name": "type", "type": ["null", "string"]},
+         {"name": "value", "type": ["null", "string"]}]}
+        }]
+      },
+      { "default": null, "name": "nestedArray", "type": [ "null", {"type": "array", "items": {
+        "type": "record",
+        "name": "ArrayItem",
+        "fields": [
+           {"name": "type", "type": ["null", "string"]},
+           {"name": "value", "type": ["null", "string"]}]}
+        }]
+      }
+  ],
+  "schema_id": 1,
+  "type": "record"
+}
diff --git a/flink-formats/flink-parquet/src/test/resources/avro/simple.avsc b/flink-formats/flink-parquet/src/test/resources/avro/simple.avsc
new file mode 100644
index 00000000000..e0eebbee227
--- /dev/null
+++ b/flink-formats/flink-parquet/src/test/resources/avro/simple.avsc
@@ -0,0 +1,12 @@
+{
+  "name": "SimpleRecord",
+  "namespace": "org.apache.flink.formats.parquet.generated",
+  "type": "record",
+  "fields": [
+    { "default": null, "name": "foo", "type": [ "null", "long" ]},
+    { "default": null, "name": "bar", "type": [ "null", "string"]},
+    { "default": null, "name": "arr", "type": [ "null", {"type": "array", "items": "long"} ]}
+  ],
+  "schema_id": 1,
+  "type": "record"
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services