You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/08/16 01:56:56 UTC

[flink] branch master updated: [FLINK-13688][hive] Limit the parallelism/memory of HiveCatalogUseBlinkITCase

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a194b37  [FLINK-13688][hive] Limit the parallelism/memory of HiveCatalogUseBlinkITCase
a194b37 is described below

commit a194b37d9b99a47174de9108a937f821816d61f5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Aug 12 10:24:39 2019 +0200

    [FLINK-13688][hive] Limit the parallelism/memory of HiveCatalogUseBlinkITCase
    
    This closes #9417
---
 flink-connectors/flink-connector-hive/pom.xml            |  9 ++++++++-
 .../table/catalog/hive/HiveCatalogUseBlinkITCase.java    |  6 +++++-
 .../table/planner/runtime/utils/BatchTestBase.scala      | 16 ++++++++++------
 3 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 017abe5..d8d00a3 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -616,7 +616,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-	</dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.11</artifactId>
+            <version>${project.version}</version>
+			<scope>test</scope>
+        </dependency>
+
+    </dependencies>
 
 	<build>
 		<plugins>
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
index 8308efb..d610125 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
@@ -34,6 +34,8 @@ import org.apache.flink.table.descriptors.OldCsv;
 import org.apache.flink.table.functions.hive.util.TestHiveGenericUDF;
 import org.apache.flink.table.functions.hive.util.TestHiveSimpleUDF;
 import org.apache.flink.table.functions.hive.util.TestHiveUDTF;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.FileUtils;
 
 import com.klarna.hiverunner.HiveShell;
@@ -65,7 +67,7 @@ import static java.lang.String.format;
  * TODO: move to flink-connector-hive-test end-to-end test module once it's setup
  */
 @RunWith(FlinkStandaloneHiveRunner.class)
-public class HiveCatalogUseBlinkITCase {
+public class HiveCatalogUseBlinkITCase extends AbstractTestBase {
 
 	@HiveSQL(files = {})
 	private static HiveShell hiveShell;
@@ -97,6 +99,8 @@ public class HiveCatalogUseBlinkITCase {
 		TableEnvironment tEnv = TableEnvironment.create(
 				EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
 
+		BatchTestBase.configForMiniCluster(tEnv.getConfig());
+
 		tEnv.registerCatalog("myhive", hiveCatalog);
 		tEnv.useCatalog("myhive");
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 76c2171..c75aff8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -69,12 +69,7 @@ class BatchTestBase extends BatchAbstractTestBase {
 
   @Before
   def before(): Unit = {
-    conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
-    conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY, "2mb")
-    conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY, "2mb")
-    conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_SORT_MEMORY, "1mb")
-    conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY, "1mb")
-    conf.getConfiguration.setString(TABLE_EXEC_SHUFFLE_MODE, ShuffleMode.PIPELINED.toString)
+    BatchTestBase.configForMiniCluster(conf)
   }
 
   /**
@@ -477,4 +472,13 @@ object BatchTestBase {
         assertEquals(msg, e, r)
     }
   }
+
+  def configForMiniCluster(conf: TableConfig): Unit = {
+    conf.getConfiguration.setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, DEFAULT_PARALLELISM)
+    conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY, "2mb")
+    conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY, "2mb")
+    conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_SORT_MEMORY, "1mb")
+    conf.getConfiguration.setString(TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY, "1mb")
+    conf.getConfiguration.setString(TABLE_EXEC_SHUFFLE_MODE, ShuffleMode.PIPELINED.toString)
+  }
 }