You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/07 09:34:09 UTC
[flink-table-store] branch master updated: [FLINK-28110] Support projection pushdown for Hive readers
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 952b62a7 [FLINK-28110] Support projection pushdown for Hive readers
952b62a7 is described below
commit 952b62a794928391f00f98218f5fab86b6bf5ee2
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Jul 7 17:34:04 2022 +0800
[FLINK-28110] Support projection pushdown for Hive readers
This closes #202
---
.../flink/table/store/utils/ProjectedRowData.java | 3 +
.../table/store/mapred/TableStoreInputFormat.java | 11 +++-
.../table/store/mapred/TableStoreInputSplit.java | 4 --
.../table/store/mapred/TableStoreRecordReader.java | 38 ++++++++++--
.../hive/TableStoreHiveStorageHandlerITCase.java | 40 ++++++++++++
.../store/mapred/TableStoreRecordReaderTest.java | 71 ++++++++++++++++++----
6 files changed, 142 insertions(+), 25 deletions(-)
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
index c00ce576..9f45ec88 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
@@ -80,6 +80,9 @@ public class ProjectedRowData implements RowData {
@Override
public boolean isNullAt(int pos) {
+ if (indexMapping[pos] < 0) {
+ return true;
+ }
return row.isNullAt(indexMapping[pos]);
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 91ef0bdc..bcdf4814 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -41,6 +42,7 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Optional;
/**
@@ -55,7 +57,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
TableScan scan = table.newScan();
createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
return scan.plan().splits.stream()
- .map(split -> TableStoreInputSplit.create(table.location().toString(), split))
+ .map(split -> new TableStoreInputSplit(table.location().toString(), split))
.toArray(TableStoreInputSplit[]::new);
}
@@ -64,8 +66,11 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
FileStoreTable table = createFileStoreTable(jobConf);
TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
- long splitLength = split.getLength();
- return new TableStoreRecordReader(table.newRead().createReader(split.split()), splitLength);
+ return new TableStoreRecordReader(
+ table.newRead(),
+ split,
+ table.schema().fieldNames(),
+ Arrays.asList(ColumnProjectionUtils.getReadColumnNames(jobConf)));
}
private FileStoreTable createFileStoreTable(JobConf jobConf) {
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
index 2c041305..7be634eb 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
@@ -50,10 +50,6 @@ public class TableStoreInputSplit extends FileSplit {
this.split = split;
}
- public static TableStoreInputSplit create(String path, Split split) {
- return new TableStoreInputSplit(path, split);
- }
-
public Split split() {
return split;
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
index bf1f493c..4be8cd20 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
@@ -22,38 +22,66 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.RowDataContainer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.utils.ProjectedRowData;
import org.apache.hadoop.mapred.RecordReader;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.util.List;
/**
* Base {@link RecordReader} for table store. Reads {@link KeyValue}s from data files and picks out
* {@link RowData} for Hive to consume.
+ *
+ * <p>NOTE: To support projection push down, when {@code selectedColumns} does not match {@code
+ * columnNames} this reader will still produce records of the original schema. However, columns not
+ * in {@code selectedColumns} will be null.
*/
public class TableStoreRecordReader implements RecordReader<Void, RowDataContainer> {
private final RecordReaderIterator<RowData> iterator;
private final long splitLength;
+ @Nullable private final ProjectedRowData reusedProjectedRow;
+
private float progress;
public TableStoreRecordReader(
- org.apache.flink.table.store.file.utils.RecordReader<RowData> wrapped,
- long splitLength) {
- this.iterator = new RecordReaderIterator<>(wrapped);
- this.splitLength = splitLength;
+ TableRead read,
+ TableStoreInputSplit split,
+ List<String> columnNames,
+ List<String> selectedColumns)
+ throws IOException {
+ if (columnNames.equals(selectedColumns)) {
+ reusedProjectedRow = null;
+ } else {
+ read.withProjection(selectedColumns.stream().mapToInt(columnNames::indexOf).toArray());
+ reusedProjectedRow =
+ ProjectedRowData.from(
+ columnNames.stream().mapToInt(selectedColumns::indexOf).toArray());
+ }
+
+ this.iterator = new RecordReaderIterator<>(read.createReader(split.split()));
+ this.splitLength = split.getLength();
this.progress = 0;
}
@Override
public boolean next(Void key, RowDataContainer value) throws IOException {
RowData rowData = iterator.next();
+
if (rowData == null) {
progress = 1;
return false;
} else {
- value.set(rowData);
+ if (reusedProjectedRow != null) {
+ value.set(reusedProjectedRow.replaceRow(rowData));
+ } else {
+ value.set(rowData);
+ }
return true;
}
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 8b691355..80f7d5b7 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -433,4 +433,44 @@ public class TableStoreHiveStorageHandlerITCase {
hiveShell.executeQuery(
"SELECT * FROM test_table WHERE ts = '2022-06-18 08:30:00'"));
}
+
+ @Test
+ public void testProjectionPushdown() throws Exception {
+ String path = folder.newFolder().toURI().toString();
+ Configuration conf = new Configuration();
+ conf.setString(CoreOptions.PATH, path);
+ conf.setInteger(CoreOptions.BUCKET, 2);
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.STRING().getLogicalType()
+ },
+ new String[] {"a", "b", "c"}),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
+ write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+ write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
+ write.write(GenericRowData.of(3, 30L, StringData.fromString("World")));
+ commit.commit("0", write.prepareCommit());
+ write.close();
+
+ hiveShell.execute(
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE test_table",
+ "STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'")));
+ List<String> actual = hiveShell.executeQuery("SELECT c, a FROM test_table ORDER BY a");
+ List<String> expected = Arrays.asList("Hi\t1", "Hello\t2", "World\t3");
+ Assert.assertEquals(expected, actual);
+ }
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index fb68c2d5..4bf2744c 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.mapred;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
@@ -29,8 +28,6 @@ import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.FileStoreTestUtils;
import org.apache.flink.table.store.RowDataContainer;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
@@ -42,9 +39,11 @@ import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -81,8 +80,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello")));
commit.commit("0", write.prepareCommit());
- Tuple2<RecordReader<RowData>, Long> tuple = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
- TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
+ TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
RowDataContainer container = reader.createValue();
Set<String> actual = new HashSet<>();
while (reader.next(null, container)) {
@@ -124,8 +122,7 @@ public class TableStoreRecordReaderTest {
write.write(GenericRowData.of(1, StringData.fromString("Hi")));
commit.commit("0", write.prepareCommit());
- Tuple2<RecordReader<RowData>, Long> tuple = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
- TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
+ TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
RowDataContainer container = reader.createValue();
Map<String, Integer> actual = new HashMap<>();
while (reader.next(null, container)) {
@@ -140,14 +137,62 @@ public class TableStoreRecordReaderTest {
assertThat(actual).isEqualTo(expected);
}
- private Tuple2<RecordReader<RowData>, Long> read(
- FileStoreTable table, BinaryRowData partition, int bucket) throws Exception {
+ @Test
+ public void testProjectionPushdown() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setString(CoreOptions.PATH, tempDir.toString());
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.STRING().getLogicalType()
+ },
+ new String[] {"a", "b", "c"}),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ TableWrite write = table.newWrite();
+ TableCommit commit = table.newCommit("user");
+ write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+ write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
+ write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+ commit.commit("0", write.prepareCommit());
+
+ TableStoreRecordReader reader =
+ read(table, BinaryRowDataUtil.EMPTY_ROW, 0, Arrays.asList("c", "a"));
+ RowDataContainer container = reader.createValue();
+ Map<String, Integer> actual = new HashMap<>();
+ while (reader.next(null, container)) {
+ RowData rowData = container.get();
+ String key = rowData.getInt(0) + "|" + rowData.getString(2).toString();
+ actual.compute(key, (k, v) -> (v == null ? 0 : v) + 1);
+ }
+
+ Map<String, Integer> expected = new HashMap<>();
+ expected.put("1|Hi", 2);
+ expected.put("2|Hello", 1);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private TableStoreRecordReader read(FileStoreTable table, BinaryRowData partition, int bucket)
+ throws Exception {
+ return read(table, partition, bucket, table.schema().fieldNames());
+ }
+
+ private TableStoreRecordReader read(
+ FileStoreTable table, BinaryRowData partition, int bucket, List<String> selectedColumns)
+ throws Exception {
for (Split split : table.newScan().plan().splits) {
if (split.partition().equals(partition) && split.bucket() == bucket) {
- return Tuple2.of(
- table.newRead()
- .createReader(new Split(partition, bucket, split.files(), false)),
- split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
+ return new TableStoreRecordReader(
+ table.newRead(),
+ new TableStoreInputSplit(tempDir.toString(), split),
+ table.schema().fieldNames(),
+ selectedColumns);
}
}
throw new IllegalArgumentException(