You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2021/05/20 08:18:11 UTC

[flink] branch release-1.13 updated: [FLINK-22661][hive] HiveInputFormatPartitionReader can return invalid data

This is an automated email from the ASF dual-hosted git repository.

lirui pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new c88c0c9  [FLINK-22661][hive] HiveInputFormatPartitionReader can return invalid data
c88c0c9 is described below

commit c88c0c98198b061e351d7574a5c5b891ba38116b
Author: Rui Li <li...@apache.org>
AuthorDate: Fri May 14 16:02:18 2021 +0800

    [FLINK-22661][hive] HiveInputFormatPartitionReader can return invalid data
---
 .../hive/read/HiveInputFormatPartitionReader.java  |  22 +++--
 .../read/HiveInputFormatPartitionReaderITCase.java | 101 +++++++++++++++++++++
 2 files changed, 115 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
index 266c294..b9eb704 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReader.java
@@ -89,7 +89,7 @@ public class HiveInputFormatPartitionReader
 
     @Override
     public RowData read(RowData reuse) throws IOException {
-        while (hasNext()) {
+        if (hasNext()) {
             return hiveTableInputFormat.nextRecord(reuse);
         }
         return null;
@@ -97,19 +97,25 @@ public class HiveInputFormatPartitionReader
 
     private boolean hasNext() throws IOException {
         if (inputSplits.length > 0) {
-            if (hiveTableInputFormat.reachedEnd() && readingSplitId == inputSplits.length - 1) {
-                return false;
-            } else if (hiveTableInputFormat.reachedEnd()) {
-                readingSplitId++;
-                hiveTableInputFormat.open(inputSplits[readingSplitId]);
+            if (hiveTableInputFormat.reachedEnd()) {
+                if (readingSplitId < inputSplits.length - 1) {
+                    // switch to next split
+                    hiveTableInputFormat.close();
+                    readingSplitId++;
+                    hiveTableInputFormat.open(inputSplits[readingSplitId]);
+                    return hasNext();
+                }
+            } else {
+                return true;
             }
-            return true;
         }
         return false;
     }
 
     @Override
     public void close() throws IOException {
-        hiveTableInputFormat.close();
+        if (hiveTableInputFormat != null) {
+            hiveTableInputFormat.close();
+        }
     }
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
new file mode 100644
index 0000000..48588d4
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/read/HiveInputFormatPartitionReaderITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for HiveInputFormatPartitionReader. */
+public class HiveInputFormatPartitionReaderITCase {
+
+    @Test
+    public void testReadMultipleSplits() throws Exception {
+        HiveCatalog hiveCatalog = HiveTestUtils.createHiveCatalog();
+        TableEnvironment tableEnv =
+                HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+
+        testReadFormat(tableEnv, hiveCatalog, "orc");
+        testReadFormat(tableEnv, hiveCatalog, "parquet");
+    }
+
+    private void testReadFormat(TableEnvironment tableEnv, HiveCatalog hiveCatalog, String format)
+            throws Exception {
+        String tableName = prepareData(tableEnv, format);
+        ObjectPath tablePath = new ObjectPath("default", tableName);
+        TableSchema tableSchema = hiveCatalog.getTable(tablePath).getSchema();
+        // create partition reader
+        HiveInputFormatPartitionReader partitionReader =
+                new HiveInputFormatPartitionReader(
+                        new JobConf(hiveCatalog.getHiveConf()),
+                        hiveCatalog.getHiveVersion(),
+                        tablePath,
+                        tableSchema.getFieldDataTypes(),
+                        tableSchema.getFieldNames(),
+                        Collections.emptyList(),
+                        null,
+                        false);
+        Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+        // create HiveTablePartition to read from
+        HiveTablePartition tablePartition =
+                new HiveTablePartition(
+                        hiveTable.getSd(),
+                        HiveReflectionUtils.getTableMetadata(
+                                HiveShimLoader.loadHiveShim(hiveCatalog.getHiveVersion()),
+                                hiveTable));
+        partitionReader.open(Collections.singletonList(tablePartition));
+        GenericRowData reuse = new GenericRowData(tableSchema.getFieldCount());
+        int count = 0;
+        // this follows the way the partition reader is used during lookup join
+        while (partitionReader.read(reuse) != null) {
+            count++;
+        }
+        assertEquals(
+                CollectionUtil.iteratorToList(
+                                tableEnv.executeSql("select * from " + tableName).collect())
+                        .size(),
+                count);
+    }
+
+    private String prepareData(TableEnvironment tableEnv, String format) throws Exception {
+        String tableName = format + "_table";
+        tableEnv.executeSql(
+                String.format("create table %s (i int,s string) stored as %s", tableName, format));
+        tableEnv.executeSql(String.format("insert into %s values (1,'a')", tableName)).await();
+        tableEnv.executeSql(String.format("insert into %s values (2,'b')", tableName)).await();
+        return tableName;
+    }
+}