You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/16 21:50:03 UTC

[flink] branch master updated: [FLINK-13747][hive] Remove some TODOs in Hive connector

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a6571bb  [FLINK-13747][hive] Remove some TODOs in Hive connector
a6571bb is described below

commit a6571bb61f41a65b47ec250231a32e14b1390069
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Aug 16 15:46:00 2019 +0800

    [FLINK-13747][hive] Remove some TODOs in Hive connector
    
    This PR is to fix some obsolete TODOs.
    
    This closes #9460.
---
 .../flink/connectors/hive/FlinkStandaloneHiveRunner.java       |  1 -
 .../flink/connectors/hive/TableEnvHiveConnectorTest.java       | 10 ++++++++--
 .../flink/table/planner/runtime/utils/BatchTableEnvUtil.scala  |  7 ++++---
 .../apache/flink/table/planner/runtime/utils/TableUtil.scala   |  8 ++++++--
 .../org/apache/flink/table/planner/utils/TableTestBase.scala   |  3 +--
 5 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
index 35dc892..343a299 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
@@ -355,7 +355,6 @@ public class FlinkStandaloneHiveRunner extends BlockJUnit4ClassRunner {
 		args.add(System.getProperty("java.class.path"));
 
 		// set sys properties
-		// TODO: generate hive-site.xml at runtime?
 		args.add(hiveCmdLineConfig(METASTOREWAREHOUSE.varname, outsideConf.getVar(METASTOREWAREHOUSE)));
 		args.add(hiveCmdLineConfig(SCRATCHDIR.varname, outsideConf.getVar(SCRATCHDIR)));
 		args.add(hiveCmdLineConfig(LOCALSCRATCHDIR.varname, outsideConf.getVar(LOCALSCRATCHDIR)));
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 0845eae..bf6ec2f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -19,11 +19,14 @@
 package org.apache.flink.connectors.hive;
 
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.types.Row;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
@@ -39,6 +42,8 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 
+import scala.collection.JavaConverters;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -81,8 +86,9 @@ public class TableEnvHiveConnectorTest {
 		FileSystem fs = defaultPartPath.getFileSystem(hiveConf);
 		assertTrue(fs.exists(defaultPartPath));
 
-		// TODO: test reading from flink when https://issues.apache.org/jira/browse/FLINK-13279 is fixed
-		assertEquals(Arrays.asList("1\t1", "2\tNULL"), hiveShell.executeQuery("select * from db1.part"));
+		TableImpl flinkTable = (TableImpl) tableEnv.sqlQuery("select * from db1.part order by x");
+		List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect(flinkTable)).asJava();
+		assertEquals(Arrays.toString(new String[]{"1,1", "2,null"}), rows.toString());
 
 		hiveShell.execute("drop database db1 cascade");
 	}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
index fca9c1d..7305f53 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
@@ -33,7 +33,6 @@ import org.apache.flink.table.planner.sinks.CollectTableSink
 import org.apache.flink.table.planner.utils.TableTestUtil
 import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 import org.apache.flink.util.AbstractID
-
 import _root_.java.util.{UUID, ArrayList => JArrayList}
 
 import _root_.scala.collection.JavaConversions._
@@ -46,7 +45,9 @@ object BatchTableEnvUtil {
       tEnv: TableEnvironment,
       table: Table,
       sink: CollectTableSink[T],
-      jobName: Option[String]): Seq[T] = {
+      jobName: Option[String],
+      builtInCatalogName: String,
+      builtInDBName: String): Seq[T] = {
     val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
       .asInstanceOf[TypeInformation[T]]
       .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
@@ -55,7 +56,7 @@ object BatchTableEnvUtil {
     sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
     val sinkName = UUID.randomUUID().toString
     tEnv.registerTableSink(sinkName, sink)
-    tEnv.insertInto(table, sinkName)
+    tEnv.insertInto(table, builtInCatalogName, builtInDBName, sinkName)
 
     val res = tEnv.execute("test")
     val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala
index 1202e22..6b98d2c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.runtime.utils
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.EnvironmentSettings
 import org.apache.flink.table.api.internal.TableImpl
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
@@ -52,7 +53,9 @@ object TableUtil {
       new CollectTableSink(_ => t.asInstanceOf[TypeInformation[T]]), Option(jobName))
 
   def collectSink[T](
-      table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None): Seq[T] = {
+      table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None,
+      builtInCatalogName: String = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
+      builtInDBName: String = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE): Seq[T] = {
     // get schema information of table
     val relNode = TableTestUtil.toRelNode(table)
     val rowType = relNode.getRowType
@@ -71,7 +74,8 @@ object TableUtil {
     val configuredSink = sink.configure(
       fieldNames, fieldTypes.map(TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo))
     BatchTableEnvUtil.collect(table.getTableEnvironment,
-      table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName)
+      table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName,
+      builtInCatalogName, builtInDBName)
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index eb8aa57..0201d77 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -953,8 +953,7 @@ class TestingTableEnvironment private(
   }
 
   override def insertInto(table: Table, path: String, pathContinued: String*): Unit = {
-    val fullPath = List(path)
-    fullPath.addAll(pathContinued)
+    val fullPath = List(path) ++ pathContinued.toList
 
     val modifyOperations = List(new CatalogSinkModifyOperation(fullPath, table.getQueryOperation))
     if (isEagerOperationTranslation) {