You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/18 08:53:22 UTC
[flink] branch release-1.10 updated: [FLINK-16413][hive] Reduce
hive source parallelism when limit push down
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new c4245bdc [FLINK-16413][hive] Reduce hive source parallelism when limit push down
c4245bdc is described below
commit c4245bdcd2e9ba48cdab9760d8120df60aa0e2d5
Author: JunZhang <zh...@126.com>
AuthorDate: Wed Mar 18 16:53:06 2020 +0800
[FLINK-16413][hive] Reduce hive source parallelism when limit push down
This closes #11429
---
.../flink/connectors/hive/HiveTableSource.java | 7 ++++-
.../flink/connectors/hive/HiveTableSourceTest.java | 34 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 5d654cf..98691b9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -138,6 +139,7 @@ public class HiveTableSource implements
HiveTableInputFormat inputFormat = getInputFormat(allHivePartitions, conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
DataStreamSource<BaseRow> source = execEnv.createInput(inputFormat, typeInfo);
+ int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
if (conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) {
int max = conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);
if (max < 1) {
@@ -158,8 +160,11 @@ public class HiveTableSource implements
} catch (IOException e) {
throw new FlinkHiveException(e);
}
- source.setParallelism(Math.min(Math.max(1, splitNum), max));
+ parallelism = Math.min(splitNum, max);
}
+ parallelism = limit > 0 ? Math.min(parallelism, (int) limit / 1000) : parallelism;
+ parallelism = Math.max(1, parallelism);
+ source.setParallelism(parallelism);
return source.name(explainSource());
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 18cdea2..c6a1043 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -27,10 +27,13 @@ import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
import org.apache.flink.connectors.hive.read.HiveTableInputSplit;
import org.apache.flink.connectors.hive.read.HiveVectorizedOrcSplitReader;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.table.HiveVersionTestUtil;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableUtils;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
@@ -414,6 +417,37 @@ public class HiveTableSourceTest {
}
@Test
+ public void testParallelismOnLimitPushDown() {
+ final String catalogName = "hive";
+ final String dbName = "source_db";
+ final String tblName = "test_parallelism_limit_pushdown";
+ hiveShell.execute("CREATE TABLE source_db.test_parallelism_limit_pushdown " +
+ "(year STRING, value INT) partitioned by (pt int);");
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+ .addRow(new Object[]{"2014", 3})
+ .addRow(new Object[]{"2014", 4})
+ .commit("pt=0");
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+ .addRow(new Object[]{"2015", 2})
+ .addRow(new Object[]{"2015", 5})
+ .commit("pt=1");
+ TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+ tEnv.getConfig().getConfiguration().setBoolean(
+ HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, false);
+ tEnv.getConfig().getConfiguration().setInteger(
+ ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+ tEnv.registerCatalog(catalogName, hiveCatalog);
+ Table table = tEnv.sqlQuery("select * from hive.source_db.test_parallelism_limit_pushdown limit 1");
+ PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
+ RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
+ ExecNode execNode = planner.translateToExecNodePlan(toScala(Collections.singletonList(relNode))).get(0);
+ @SuppressWarnings("unchecked")
+ Transformation transformation = execNode.translateToPlan(planner);
+ Assert.assertEquals(1, ((PartitionTransformation) ((OneInputTransformation) transformation).getInput())
+ .getInput().getParallelism());
+ }
+
+ @Test
public void testVectorReaderSwitch() throws Exception {
// vector reader not available for 1.x and we're not testing orc for 2.0.x
Assume.assumeTrue(HiveVersionTestUtil.HIVE_210_OR_LATER);