You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/07 09:34:09 UTC

[flink-table-store] branch master updated: [FLINK-28110] Support projection pushdown for Hive readers

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 952b62a7 [FLINK-28110] Support projection pushdown for Hive readers
952b62a7 is described below

commit 952b62a794928391f00f98218f5fab86b6bf5ee2
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Jul 7 17:34:04 2022 +0800

    [FLINK-28110] Support projection pushdown for Hive readers
    
    This closes #202
---
 .../flink/table/store/utils/ProjectedRowData.java  |  3 +
 .../table/store/mapred/TableStoreInputFormat.java  | 11 +++-
 .../table/store/mapred/TableStoreInputSplit.java   |  4 --
 .../table/store/mapred/TableStoreRecordReader.java | 38 ++++++++++--
 .../hive/TableStoreHiveStorageHandlerITCase.java   | 40 ++++++++++++
 .../store/mapred/TableStoreRecordReaderTest.java   | 71 ++++++++++++++++++----
 6 files changed, 142 insertions(+), 25 deletions(-)

diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
index c00ce576..9f45ec88 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/ProjectedRowData.java
@@ -80,6 +80,9 @@ public class ProjectedRowData implements RowData {
 
     @Override
     public boolean isNullAt(int pos) {
+        if (indexMapping[pos] < 0) {
+            return true;
+        }
         return row.isNullAt(indexMapping[pos]);
     }
 
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
index 91ef0bdc..bcdf4814 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -41,6 +42,7 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Optional;
 
 /**
@@ -55,7 +57,7 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
         TableScan scan = table.newScan();
         createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter);
         return scan.plan().splits.stream()
-                .map(split -> TableStoreInputSplit.create(table.location().toString(), split))
+                .map(split -> new TableStoreInputSplit(table.location().toString(), split))
                 .toArray(TableStoreInputSplit[]::new);
     }
 
@@ -64,8 +66,11 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer
             InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
         FileStoreTable table = createFileStoreTable(jobConf);
         TableStoreInputSplit split = (TableStoreInputSplit) inputSplit;
-        long splitLength = split.getLength();
-        return new TableStoreRecordReader(table.newRead().createReader(split.split()), splitLength);
+        return new TableStoreRecordReader(
+                table.newRead(),
+                split,
+                table.schema().fieldNames(),
+                Arrays.asList(ColumnProjectionUtils.getReadColumnNames(jobConf)));
     }
 
     private FileStoreTable createFileStoreTable(JobConf jobConf) {
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
index 2c041305..7be634eb 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
@@ -50,10 +50,6 @@ public class TableStoreInputSplit extends FileSplit {
         this.split = split;
     }
 
-    public static TableStoreInputSplit create(String path, Split split) {
-        return new TableStoreInputSplit(path, split);
-    }
-
     public Split split() {
         return split;
     }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
index bf1f493c..4be8cd20 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
@@ -22,38 +22,66 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.RowDataContainer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.utils.ProjectedRowData;
 
 import org.apache.hadoop.mapred.RecordReader;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Base {@link RecordReader} for table store. Reads {@link KeyValue}s from data files and picks out
  * {@link RowData} for Hive to consume.
+ *
+ * <p>NOTE: To support projection push down, when {@code selectedColumns} does not match {@code
+ * columnNames} this reader will still produce records of the original schema. However, columns not
+ * in {@code selectedColumns} will be null.
  */
 public class TableStoreRecordReader implements RecordReader<Void, RowDataContainer> {
 
     private final RecordReaderIterator<RowData> iterator;
     private final long splitLength;
 
+    @Nullable private final ProjectedRowData reusedProjectedRow;
+
     private float progress;
 
     public TableStoreRecordReader(
-            org.apache.flink.table.store.file.utils.RecordReader<RowData> wrapped,
-            long splitLength) {
-        this.iterator = new RecordReaderIterator<>(wrapped);
-        this.splitLength = splitLength;
+            TableRead read,
+            TableStoreInputSplit split,
+            List<String> columnNames,
+            List<String> selectedColumns)
+            throws IOException {
+        if (columnNames.equals(selectedColumns)) {
+            reusedProjectedRow = null;
+        } else {
+            read.withProjection(selectedColumns.stream().mapToInt(columnNames::indexOf).toArray());
+            reusedProjectedRow =
+                    ProjectedRowData.from(
+                            columnNames.stream().mapToInt(selectedColumns::indexOf).toArray());
+        }
+
+        this.iterator = new RecordReaderIterator<>(read.createReader(split.split()));
+        this.splitLength = split.getLength();
         this.progress = 0;
     }
 
     @Override
     public boolean next(Void key, RowDataContainer value) throws IOException {
         RowData rowData = iterator.next();
+
         if (rowData == null) {
             progress = 1;
             return false;
         } else {
-            value.set(rowData);
+            if (reusedProjectedRow != null) {
+                value.set(reusedProjectedRow.replaceRow(rowData));
+            } else {
+                value.set(rowData);
+            }
             return true;
         }
     }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 8b691355..80f7d5b7 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -433,4 +433,44 @@ public class TableStoreHiveStorageHandlerITCase {
                 hiveShell.executeQuery(
                         "SELECT * FROM test_table WHERE ts = '2022-06-18 08:30:00'"));
     }
+
+    @Test
+    public void testProjectionPushdown() throws Exception {
+        String path = folder.newFolder().toURI().toString();
+        Configuration conf = new Configuration();
+        conf.setString(CoreOptions.PATH, path);
+        conf.setInteger(CoreOptions.BUCKET, 2);
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
+                        conf,
+                        RowType.of(
+                                new LogicalType[] {
+                                    DataTypes.INT().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType(),
+                                    DataTypes.STRING().getLogicalType()
+                                },
+                                new String[] {"a", "b", "c"}),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+        write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
+        write.write(GenericRowData.of(3, 30L, StringData.fromString("World")));
+        commit.commit("0", write.prepareCommit());
+        write.close();
+
+        hiveShell.execute(
+                String.join(
+                        "\n",
+                        Arrays.asList(
+                                "CREATE EXTERNAL TABLE test_table",
+                                "STORED BY '" + TableStoreHiveStorageHandler.class.getName() + "'",
+                                "LOCATION '" + path + "'")));
+        List<String> actual = hiveShell.executeQuery("SELECT c, a FROM test_table ORDER BY a");
+        List<String> expected = Arrays.asList("Hi\t1", "Hello\t2", "World\t3");
+        Assert.assertEquals(expected, actual);
+    }
 }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index fb68c2d5..4bf2744c 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.mapred;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
@@ -29,8 +28,6 @@ import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.FileStoreTestUtils;
 import org.apache.flink.table.store.RowDataContainer;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
@@ -42,9 +39,11 @@ import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -81,8 +80,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello")));
         commit.commit("0", write.prepareCommit());
 
-        Tuple2<RecordReader<RowData>, Long> tuple = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
-        TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
+        TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
         RowDataContainer container = reader.createValue();
         Set<String> actual = new HashSet<>();
         while (reader.next(null, container)) {
@@ -124,8 +122,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
         commit.commit("0", write.prepareCommit());
 
-        Tuple2<RecordReader<RowData>, Long> tuple = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
-        TableStoreRecordReader reader = new TableStoreRecordReader(tuple.f0, tuple.f1);
+        TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
         RowDataContainer container = reader.createValue();
         Map<String, Integer> actual = new HashMap<>();
         while (reader.next(null, container)) {
@@ -140,14 +137,62 @@ public class TableStoreRecordReaderTest {
         assertThat(actual).isEqualTo(expected);
     }
 
-    private Tuple2<RecordReader<RowData>, Long> read(
-            FileStoreTable table, BinaryRowData partition, int bucket) throws Exception {
+    @Test
+    public void testProjectionPushdown() throws Exception {
+        Configuration conf = new Configuration();
+        conf.setString(CoreOptions.PATH, tempDir.toString());
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
+                        conf,
+                        RowType.of(
+                                new LogicalType[] {
+                                    DataTypes.INT().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType(),
+                                    DataTypes.STRING().getLogicalType()
+                                },
+                                new String[] {"a", "b", "c"}),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+        write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
+        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
+        commit.commit("0", write.prepareCommit());
+
+        TableStoreRecordReader reader =
+                read(table, BinaryRowDataUtil.EMPTY_ROW, 0, Arrays.asList("c", "a"));
+        RowDataContainer container = reader.createValue();
+        Map<String, Integer> actual = new HashMap<>();
+        while (reader.next(null, container)) {
+            RowData rowData = container.get();
+            String key = rowData.getInt(0) + "|" + rowData.getString(2).toString();
+            actual.compute(key, (k, v) -> (v == null ? 0 : v) + 1);
+        }
+
+        Map<String, Integer> expected = new HashMap<>();
+        expected.put("1|Hi", 2);
+        expected.put("2|Hello", 1);
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private TableStoreRecordReader read(FileStoreTable table, BinaryRowData partition, int bucket)
+            throws Exception {
+        return read(table, partition, bucket, table.schema().fieldNames());
+    }
+
+    private TableStoreRecordReader read(
+            FileStoreTable table, BinaryRowData partition, int bucket, List<String> selectedColumns)
+            throws Exception {
         for (Split split : table.newScan().plan().splits) {
             if (split.partition().equals(partition) && split.bucket() == bucket) {
-                return Tuple2.of(
-                        table.newRead()
-                                .createReader(new Split(partition, bucket, split.files(), false)),
-                        split.files().stream().mapToLong(DataFileMeta::fileSize).sum());
+                return new TableStoreRecordReader(
+                        table.newRead(),
+                        new TableStoreInputSplit(tempDir.toString(), split),
+                        table.schema().fieldNames(),
+                        selectedColumns);
             }
         }
         throw new IllegalArgumentException(