You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/18 08:53:22 UTC

[flink] branch release-1.10 updated: [FLINK-16413][hive] Reduce hive source parallelism when limit push down

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new c4245bdc [FLINK-16413][hive] Reduce hive source parallelism when limit push down
c4245bdc is described below

commit c4245bdcd2e9ba48cdab9760d8120df60aa0e2d5
Author: JunZhang <zh...@126.com>
AuthorDate: Wed Mar 18 16:53:06 2020 +0800

    [FLINK-16413][hive] Reduce hive source parallelism when limit push down
    
    
    This closes #11429
---
 .../flink/connectors/hive/HiveTableSource.java     |  7 ++++-
 .../flink/connectors/hive/HiveTableSourceTest.java | 34 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 5d654cf..98691b9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -138,6 +139,7 @@ public class HiveTableSource implements
 		HiveTableInputFormat inputFormat = getInputFormat(allHivePartitions, conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
 		DataStreamSource<BaseRow> source = execEnv.createInput(inputFormat, typeInfo);
 
+		int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
 		if (conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) {
 			int max = conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
 			if (max < 1) {
@@ -158,8 +160,11 @@ public class HiveTableSource implements
 			} catch (IOException e) {
 				throw new FlinkHiveException(e);
 			}
-			source.setParallelism(Math.min(Math.max(1, splitNum), max));
+			parallelism = Math.min(splitNum, max);
 		}
+		parallelism = limit > 0 ? Math.min(parallelism, (int) limit / 1000) : parallelism;
+		parallelism = Math.max(1, parallelism);
+		source.setParallelism(parallelism);
 		return source.name(explainSource());
 	}
 
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 18cdea2..c6a1043 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -27,10 +27,13 @@ import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
 import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
 import org.apache.flink.connectors.hive.read.HiveVectorizedOrcSplitReader;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.table.HiveVersionTestUtil;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableUtils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -414,6 +417,37 @@ public class HiveTableSourceTest {
 	}
 
 	@Test
+	public void testParallelismOnLimitPushDown() {
+		final String catalogName = "hive";
+		final String dbName = "source_db";
+		final String tblName = "test_parallelism_limit_pushdown";
+		hiveShell.execute("CREATE TABLE source_db.test_parallelism_limit_pushdown " +
+					"(year STRING, value INT) partitioned by (pt int);");
+		HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+					.addRow(new Object[]{"2014", 3})
+					.addRow(new Object[]{"2014", 4})
+					.commit("pt=0");
+		HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+					.addRow(new Object[]{"2015", 2})
+					.addRow(new Object[]{"2015", 5})
+					.commit("pt=1");
+		TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+		tEnv.getConfig().getConfiguration().setBoolean(
+			HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+		tEnv.getConfig().getConfiguration().setInteger(
+			ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+		tEnv.registerCatalog(catalogName, hiveCatalog);
+		Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism_limit_pushdown limit 1");
+		PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
+		RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
+		ExecNode execNode = planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
+		@SuppressWarnings("unchecked")
+		Transformation transformation = execNode.translateToPlan(planner);
+		Assert.assertEquals(1, ((PartitionTransformation) ((OneInputTransformation) transformation).getInput())
+			.getInput().getParallelism());
+	}
+
+	@Test
 	public void testVectorReaderSwitch() throws Exception {
 		// vector reader not available for 1.x and we're not testing orc for 2.0.x
 		Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER);