You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/21 08:22:36 UTC

[flink] branch release-1.11 updated: [FLINK-17474][parquet][hive] Parquet reader should be case insensitive for hive

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d00fa4c  [FLINK-17474][parquet][hive] Parquet reader should be case insensitive for hive
d00fa4c is described below

commit d00fa4c67a20cfcff01101d1bb76026478329921
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu May 21 16:21:55 2020 +0800

    [FLINK-17474][parquet][hive] Parquet reader should be case insensitive for hive
    
    
    This closes #12274
---
 .../read/HiveVectorizedParquetSplitReader.java     |  1 +
 .../flink/connectors/hive/HiveTableSourceTest.java | 35 +++++++++++++++++-
 .../flink/table/catalog/hive/HiveTestUtils.java    | 12 ++++++
 .../parquet/ParquetFileSystemFormatFactory.java    |  1 +
 .../vector/ParquetColumnarRowSplitReader.java      | 43 ++++++++++++++++++----
 .../parquet/vector/ParquetSplitReaderUtil.java     |  2 +
 .../parquet/row/ParquetRowDataWriterTest.java      |  1 +
 .../vector/ParquetColumnarRowSplitReaderTest.java  |  3 ++
 8 files changed, 90 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedParquetSplitReader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedParquetSplitReader.java
index f16becf..7823b71 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedParquetSplitReader.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedParquetSplitReader.java
@@ -63,6 +63,7 @@ public class HiveVectorizedParquetSplitReader implements SplitReader {
 
 		this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
 				hiveVersion.startsWith("3"),
+				false, // hive case insensitive
 				conf,
 				fieldNames,
 				fieldTypes,
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 3c05d70..6c1c483 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -46,6 +47,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase;
 import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
 import org.apache.flink.table.planner.runtime.utils.TestingAppendRowDataSink;
 import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
@@ -81,6 +83,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.table.catalog.hive.HiveTestUtils.createTableEnvWithHiveCatalog;
+import static org.apache.flink.table.catalog.hive.HiveTestUtils.waitForJobFinish;
 import static org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -95,7 +99,7 @@ import static org.mockito.Mockito.spy;
  * Tests {@link HiveTableSource}.
  */
 @RunWith(FlinkStandaloneHiveRunner.class)
-public class HiveTableSourceTest {
+public class HiveTableSourceTest extends BatchAbstractTestBase {
 
 	@HiveSQL(files = {})
 	private static HiveShell hiveShell;
@@ -639,6 +643,35 @@ public class HiveTableSourceTest {
 		assertEquals("[1,a, 2,b]", results.toString());
 	}
 
+	@Test
+	public void testParquetCaseInsensitive() throws Exception {
+		testCaseInsensitive("parquet");
+	}
+
+	private void testCaseInsensitive(String format) throws Exception {
+		TableEnvironment tEnv = createTableEnvWithHiveCatalog(hiveCatalog);
+		String folderURI = TEMPORARY_FOLDER.newFolder().toURI().toString();
+
+		// Flink to write sensitive fields to parquet file
+		tEnv.executeSql(String.format(
+				"create table parquet_t (I int, J int) with (" +
+						"'connector'='filesystem','format'='%s','path'='%s')",
+				format,
+				folderURI));
+		waitForJobFinish(tEnv.executeSql("insert into parquet_t select 1, 2"));
+		tEnv.executeSql("drop table parquet_t");
+
+		// Hive to read parquet file
+		tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+		tEnv.executeSql(String.format(
+				"create external table parquet_t (i int, j int) stored as %s location '%s'",
+				format,
+				folderURI));
+		Assert.assertEquals(
+				Row.of(1, 2),
+				tEnv.executeSql("select * from parquet_t").collect().next());
+	}
+
 	/**
 	 * A sub-class of HiveTableSource to test vector reader switch.
 	 */
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 3519a58..252c6e1 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog.hive;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogTest;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -126,6 +127,17 @@ public class HiveTestUtils {
 		return tableEnv;
 	}
 
+	public static TableEnvironment createTableEnvWithHiveCatalog(HiveCatalog catalog) {
+		TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+		tableEnv.registerCatalog(catalog.getName(), catalog);
+		tableEnv.useCatalog(catalog.getName());
+		return tableEnv;
+	}
+
+	public static void waitForJobFinish(TableResult tableResult) throws Exception {
+		tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+	}
+
 	// Insert into a single partition of a text table.
 	public static TextTableInserter createTextTableInserter(HiveShell hiveShell, String dbName, String tableName) {
 		return new TextTableInserter(hiveShell, dbName, tableName);
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
index a614164..24dfaf4 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
@@ -172,6 +172,7 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory {
 
 			this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
 					utcTimestamp,
+					true,
 					conf.conf(),
 					fullFieldNames,
 					fullFieldTypes,
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
index c7bb093..b13993c 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.vector.VectorizedColumnBatch;
 import org.apache.flink.table.data.vector.writable.WritableColumnVector;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -43,7 +44,10 @@ import org.apache.parquet.schema.Types;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 
 import static org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createColumnReader;
 import static org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector;
@@ -105,6 +109,7 @@ public class ParquetColumnarRowSplitReader implements Closeable {
 
 	public ParquetColumnarRowSplitReader(
 			boolean utcTimestamp,
+			boolean caseSensitive,
 			Configuration conf,
 			LogicalType[] selectedTypes,
 			String[] selectedFieldNames,
@@ -123,7 +128,7 @@ public class ParquetColumnarRowSplitReader implements Closeable {
 		List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
 
 		this.fileSchema = footer.getFileMetaData().getSchema();
-		this.requestedSchema = clipParquetSchema(fileSchema, selectedFieldNames);
+		this.requestedSchema = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
 		this.reader = new ParquetFileReader(
 				conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
 
@@ -146,15 +151,39 @@ public class ParquetColumnarRowSplitReader implements Closeable {
 	/**
 	 * Clips `parquetSchema` according to `fieldNames`.
 	 */
-	private static MessageType clipParquetSchema(GroupType parquetSchema, String[] fieldNames) {
+	private static MessageType clipParquetSchema(
+			GroupType parquetSchema, String[] fieldNames, boolean caseSensitive) {
 		Type[] types = new Type[fieldNames.length];
-		for (int i = 0; i < fieldNames.length; ++i) {
-			String fieldName = fieldNames[i];
-			if (parquetSchema.getFieldIndex(fieldName) < 0) {
-				throw new IllegalArgumentException(fieldName + " does not exist");
+		if (caseSensitive) {
+			for (int i = 0; i < fieldNames.length; ++i) {
+				String fieldName = fieldNames[i];
+				if (parquetSchema.getFieldIndex(fieldName) < 0) {
+					throw new IllegalArgumentException(fieldName + " does not exist");
+				}
+				types[i] = parquetSchema.getType(fieldName);
+			}
+		} else {
+			Map<String, Type> caseInsensitiveFieldMap = new HashMap<>();
+			for (Type type : parquetSchema.getFields()) {
+				caseInsensitiveFieldMap.compute(type.getName().toLowerCase(Locale.ROOT),
+						(key, previousType) -> {
+					if (previousType != null) {
+						throw new FlinkRuntimeException(
+								"Parquet with case insensitive mode should have no duplicate key: " + key);
+					}
+					return type;
+				});
+			}
+			for (int i = 0; i < fieldNames.length; ++i) {
+				Type type = caseInsensitiveFieldMap.get(fieldNames[i].toLowerCase(Locale.ROOT));
+				if (type == null) {
+					throw new IllegalArgumentException(fieldNames[i] + " does not exist");
+				}
+				// TODO clip for array,map,row types.
+				types[i] = type;
 			}
-			types[i] = parquetSchema.getType(fieldName);
 		}
+
 		return Types.buildMessage().addFields(types).named("flink-parquet");
 	}
 
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
index 280cfab..0394336 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
@@ -83,6 +83,7 @@ public class ParquetSplitReaderUtil {
 	 */
 	public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
 			boolean utcTimestamp,
+			boolean caseSensitive,
 			Configuration conf,
 			String[] fullFieldNames,
 			DataType[] fullFieldTypes,
@@ -119,6 +120,7 @@ public class ParquetSplitReaderUtil {
 
 		return new ParquetColumnarRowSplitReader(
 				utcTimestamp,
+				caseSensitive,
 				conf,
 				Arrays.stream(selParquetFields)
 						.mapToObj(i -> fullFieldTypes[i].getLogicalType())
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
index eecf1b8..f743b9c 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
@@ -139,6 +139,7 @@ public class ParquetRowDataWriterTest {
 		// verify
 		ParquetColumnarRowSplitReader reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
 				utcTimestamp,
+				true,
 				conf,
 				ROW_TYPE.getFieldNames().toArray(new String[0]),
 				ROW_TYPE.getChildren().stream()
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
index 93812df..bea997e 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
@@ -205,6 +205,7 @@ public class ParquetColumnarRowSplitReaderTest {
 
 		ParquetColumnarRowSplitReader reader = new ParquetColumnarRowSplitReader(
 				false,
+				true,
 				new Configuration(),
 				fieldTypes,
 				new String[] {
@@ -402,6 +403,7 @@ public class ParquetColumnarRowSplitReaderTest {
 				new IntType()};
 		ParquetColumnarRowSplitReader reader = new ParquetColumnarRowSplitReader(
 				false,
+				true,
 				new Configuration(),
 				fieldTypes,
 				new String[] {"f7", "f2", "f4"},
@@ -494,6 +496,7 @@ public class ParquetColumnarRowSplitReaderTest {
 				new VarCharType(VarCharType.MAX_LENGTH)};
 		ParquetColumnarRowSplitReader reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
 				false,
+				true,
 				new Configuration(),
 				IntStream.range(0, 28).mapToObj(i -> "f" + i).toArray(String[]::new),
 				Arrays.stream(fieldTypes)