You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/21 08:22:36 UTC
[flink] branch release-1.11 updated: [FLINK-17474][parquet][hive]
Parquet reader should be case insensitive for hive
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new d00fa4c [FLINK-17474][parquet][hive] Parquet reader should be case insensitive for hive
d00fa4c is described below
commit d00fa4c67a20cfcff01101d1bb76026478329921
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu May 21 16:21:55 2020 +0800
[FLINK-17474][parquet][hive] Parquet reader should be case insensitive for hive
This closes #12274
---
.../read/HiveVectorizedParquetSplitReader.java | 1 +
.../flink/connectors/hive/HiveTableSourceTest.java | 35 +++++++++++++++++-
.../flink/table/catalog/hive/HiveTestUtils.java | 12 ++++++
.../parquet/ParquetFileSystemFormatFactory.java | 1 +
.../vector/ParquetColumnarRowSplitReader.java | 43 ++++++++++++++++++----
.../parquet/vector/ParquetSplitReaderUtil.java | 2 +
.../parquet/row/ParquetRowDataWriterTest.java | 1 +
.../vector/ParquetColumnarRowSplitReaderTest.java | 3 ++
8 files changed, 90 insertions(+), 8 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedParquetSplitReader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedParquetSplitReader.java
index f16becf..7823b71 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedParquetSplitReader.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedParquetSplitReader.java
@@ -63,6 +63,7 @@ public class HiveVectorizedParquetSplitReader implements SplitReader {
this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
hiveVersion.startsWith("3"),
+ false, // hive case insensitive
conf,
fieldNames,
fieldTypes,
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 3c05d70..6c1c483 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -46,6 +47,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase;
import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
import org.apache.flink.table.planner.runtime.utils.TestingAppendRowDataSink;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
@@ -81,6 +83,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static org.apache.flink.table.catalog.hive.HiveTestUtils.createTableEnvWithHiveCatalog;
+import static org.apache.flink.table.catalog.hive.HiveTestUtils.waitForJobFinish;
import static org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -95,7 +99,7 @@ import static org.mockito.Mockito.spy;
* Tests {@link HiveTableSource}.
*/
@RunWith(FlinkStandaloneHiveRunner.class)
-public class HiveTableSourceTest {
+public class HiveTableSourceTest extends BatchAbstractTestBase {
@HiveSQL(files = {})
private static HiveShell hiveShell;
@@ -639,6 +643,35 @@ public class HiveTableSourceTest {
assertEquals("[1,a, 2,b]", results.toString());
}
+ @Test
+ public void testParquetCaseInsensitive() throws Exception {
+ testCaseInsensitive("parquet");
+ }
+
+ private void testCaseInsensitive(String format) throws Exception {
+ TableEnvironment tEnv = createTableEnvWithHiveCatalog(hiveCatalog);
+ String folderURI = TEMPORARY_FOLDER.newFolder().toURI().toString();
+
+ // Flink to write sensitive fields to parquet file
+ tEnv.executeSql(String.format(
+ "create table parquet_t (I int, J int) with (" +
+ "'connector'='filesystem','format'='%s','path'='%s')",
+ format,
+ folderURI));
+ waitForJobFinish(tEnv.executeSql("insert into parquet_t select 1, 2"));
+ tEnv.executeSql("drop table parquet_t");
+
+ // Hive to read parquet file
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql(String.format(
+ "create external table parquet_t (i int, j int) stored as %s location '%s'",
+ format,
+ folderURI));
+ Assert.assertEquals(
+ Row.of(1, 2),
+ tEnv.executeSql("select * from parquet_t").collect().next());
+ }
+
/**
* A sub-class of HiveTableSource to test vector reader switch.
*/
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 3519a58..252c6e1 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog.hive;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTest;
import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -126,6 +127,17 @@ public class HiveTestUtils {
return tableEnv;
}
+ public static TableEnvironment createTableEnvWithHiveCatalog(HiveCatalog catalog) {
+ TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+ tableEnv.registerCatalog(catalog.getName(), catalog);
+ tableEnv.useCatalog(catalog.getName());
+ return tableEnv;
+ }
+
+ public static void waitForJobFinish(TableResult tableResult) throws Exception {
+ tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+ }
+
// Insert into a single partition of a text table.
public static TextTableInserter createTextTableInserter(HiveShell hiveShell, String dbName, String tableName) {
return new TextTableInserter(hiveShell, dbName, tableName);
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
index a614164..24dfaf4 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
@@ -172,6 +172,7 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory {
this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
utcTimestamp,
+ true,
conf.conf(),
fullFieldNames,
fullFieldTypes,
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
index c7bb093..b13993c 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.vector.VectorizedColumnBatch;
import org.apache.flink.table.data.vector.writable.WritableColumnVector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -43,7 +44,10 @@ import org.apache.parquet.schema.Types;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import static org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createColumnReader;
import static org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector;
@@ -105,6 +109,7 @@ public class ParquetColumnarRowSplitReader implements Closeable {
public ParquetColumnarRowSplitReader(
boolean utcTimestamp,
+ boolean caseSensitive,
Configuration conf,
LogicalType[] selectedTypes,
String[] selectedFieldNames,
@@ -123,7 +128,7 @@ public class ParquetColumnarRowSplitReader implements Closeable {
List<BlockMetaData> blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
this.fileSchema = footer.getFileMetaData().getSchema();
- this.requestedSchema = clipParquetSchema(fileSchema, selectedFieldNames);
+ this.requestedSchema = clipParquetSchema(fileSchema, selectedFieldNames, caseSensitive);
this.reader = new ParquetFileReader(
conf, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
@@ -146,15 +151,39 @@ public class ParquetColumnarRowSplitReader implements Closeable {
/**
* Clips `parquetSchema` according to `fieldNames`.
*/
- private static MessageType clipParquetSchema(GroupType parquetSchema, String[] fieldNames) {
+ private static MessageType clipParquetSchema(
+ GroupType parquetSchema, String[] fieldNames, boolean caseSensitive) {
Type[] types = new Type[fieldNames.length];
- for (int i = 0; i < fieldNames.length; ++i) {
- String fieldName = fieldNames[i];
- if (parquetSchema.getFieldIndex(fieldName) < 0) {
- throw new IllegalArgumentException(fieldName + " does not exist");
+ if (caseSensitive) {
+ for (int i = 0; i < fieldNames.length; ++i) {
+ String fieldName = fieldNames[i];
+ if (parquetSchema.getFieldIndex(fieldName) < 0) {
+ throw new IllegalArgumentException(fieldName + " does not exist");
+ }
+ types[i] = parquetSchema.getType(fieldName);
+ }
+ } else {
+ Map<String, Type> caseInsensitiveFieldMap = new HashMap<>();
+ for (Type type : parquetSchema.getFields()) {
+ caseInsensitiveFieldMap.compute(type.getName().toLowerCase(Locale.ROOT),
+ (key, previousType) -> {
+ if (previousType != null) {
+ throw new FlinkRuntimeException(
+ "Parquet with case insensitive mode should have no duplicate key: " + key);
+ }
+ return type;
+ });
+ }
+ for (int i = 0; i < fieldNames.length; ++i) {
+ Type type = caseInsensitiveFieldMap.get(fieldNames[i].toLowerCase(Locale.ROOT));
+ if (type == null) {
+ throw new IllegalArgumentException(fieldNames[i] + " does not exist");
+ }
+ // TODO clip for array,map,row types.
+ types[i] = type;
}
- types[i] = parquetSchema.getType(fieldName);
}
+
return Types.buildMessage().addFields(types).named("flink-parquet");
}
diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
index 280cfab..0394336 100644
--- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
+++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
@@ -83,6 +83,7 @@ public class ParquetSplitReaderUtil {
*/
public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
boolean utcTimestamp,
+ boolean caseSensitive,
Configuration conf,
String[] fullFieldNames,
DataType[] fullFieldTypes,
@@ -119,6 +120,7 @@ public class ParquetSplitReaderUtil {
return new ParquetColumnarRowSplitReader(
utcTimestamp,
+ caseSensitive,
conf,
Arrays.stream(selParquetFields)
.mapToObj(i -> fullFieldTypes[i].getLogicalType())
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
index eecf1b8..f743b9c 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
@@ -139,6 +139,7 @@ public class ParquetRowDataWriterTest {
// verify
ParquetColumnarRowSplitReader reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
utcTimestamp,
+ true,
conf,
ROW_TYPE.getFieldNames().toArray(new String[0]),
ROW_TYPE.getChildren().stream()
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
index 93812df..bea997e 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
@@ -205,6 +205,7 @@ public class ParquetColumnarRowSplitReaderTest {
ParquetColumnarRowSplitReader reader = new ParquetColumnarRowSplitReader(
false,
+ true,
new Configuration(),
fieldTypes,
new String[] {
@@ -402,6 +403,7 @@ public class ParquetColumnarRowSplitReaderTest {
new IntType()};
ParquetColumnarRowSplitReader reader = new ParquetColumnarRowSplitReader(
false,
+ true,
new Configuration(),
fieldTypes,
new String[] {"f7", "f2", "f4"},
@@ -494,6 +496,7 @@ public class ParquetColumnarRowSplitReaderTest {
new VarCharType(VarCharType.MAX_LENGTH)};
ParquetColumnarRowSplitReader reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
false,
+ true,
new Configuration(),
IntStream.range(0, 28).mapToObj(i -> "f" + i).toArray(String[]::new),
Arrays.stream(fieldTypes)