You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/08/16 21:50:03 UTC
[flink] branch master updated: [FLINK-13747][hive] Remove some
TODOs in Hive connector
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a6571bb [FLINK-13747][hive] Remove some TODOs in Hive connector
a6571bb is described below
commit a6571bb61f41a65b47ec250231a32e14b1390069
Author: Rui Li <li...@apache.org>
AuthorDate: Fri Aug 16 15:46:00 2019 +0800
[FLINK-13747][hive] Remove some TODOs in Hive connector
This PR is to fix some obsolete TODOs.
This closes #9460.
---
.../flink/connectors/hive/FlinkStandaloneHiveRunner.java | 1 -
.../flink/connectors/hive/TableEnvHiveConnectorTest.java | 10 ++++++++--
.../flink/table/planner/runtime/utils/BatchTableEnvUtil.scala | 7 ++++---
.../apache/flink/table/planner/runtime/utils/TableUtil.scala | 8 ++++++--
.../org/apache/flink/table/planner/utils/TableTestBase.scala | 3 +--
5 files changed, 19 insertions(+), 10 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
index 35dc892..343a299 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
@@ -355,7 +355,6 @@ public class FlinkStandaloneHiveRunner extends BlockJUnit4ClassRunner {
args.add(System.getProperty("java.class.path"));
// set sys properties
- // TODO: generate hive-site.xml at runtime?
args.add(hiveCmdLineConfig(METASTOREWAREHOUSE.varname, outsideConf.getVar(METASTOREWAREHOUSE)));
args.add(hiveCmdLineConfig(SCRATCHDIR.varname, outsideConf.getVar(SCRATCHDIR)));
args.add(hiveCmdLineConfig(LOCALSCRATCHDIR.varname, outsideConf.getVar(LOCALSCRATCHDIR)));
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 0845eae..bf6ec2f 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -19,11 +19,14 @@
package org.apache.flink.connectors.hive;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.planner.runtime.utils.TableUtil;
+import org.apache.flink.types.Row;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
@@ -39,6 +42,8 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import scala.collection.JavaConverters;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -81,8 +86,9 @@ public class TableEnvHiveConnectorTest {
FileSystem fs = defaultPartPath.getFileSystem(hiveConf);
assertTrue(fs.exists(defaultPartPath));
- // TODO: test reading from flink when https://issues.apache.org/jira/browse/FLINK-13279 is fixed
- assertEquals(Arrays.asList("1\t1", "2\tNULL"), hiveShell.executeQuery("select * from db1.part"));
+ TableImpl flinkTable = (TableImpl) tableEnv.sqlQuery("select * from db1.part order by x");
+ List<Row> rows = JavaConverters.seqAsJavaListConverter(TableUtil.collect(flinkTable)).asJava();
+ assertEquals(Arrays.toString(new String[]{"1,1", "2,null"}), rows.toString());
hiveShell.execute("drop database db1 cascade");
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
index fca9c1d..7305f53 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTableEnvUtil.scala
@@ -33,7 +33,6 @@ import org.apache.flink.table.planner.sinks.CollectTableSink
import org.apache.flink.table.planner.utils.TableTestUtil
import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import org.apache.flink.util.AbstractID
-
import _root_.java.util.{UUID, ArrayList => JArrayList}
import _root_.scala.collection.JavaConversions._
@@ -46,7 +45,9 @@ object BatchTableEnvUtil {
tEnv: TableEnvironment,
table: Table,
sink: CollectTableSink[T],
- jobName: Option[String]): Seq[T] = {
+ jobName: Option[String],
+ builtInCatalogName: String,
+ builtInDBName: String): Seq[T] = {
val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
.asInstanceOf[TypeInformation[T]]
.createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
@@ -55,7 +56,7 @@ object BatchTableEnvUtil {
sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
val sinkName = UUID.randomUUID().toString
tEnv.registerTableSink(sinkName, sink)
- tEnv.insertInto(table, sinkName)
+ tEnv.insertInto(table, builtInCatalogName, builtInDBName, sinkName)
val res = tEnv.execute("test")
val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala
index 1202e22..6b98d2c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/TableUtil.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.runtime.utils
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.internal.TableImpl
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
@@ -52,7 +53,9 @@ object TableUtil {
new CollectTableSink(_ => t.asInstanceOf[TypeInformation[T]]), Option(jobName))
def collectSink[T](
- table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None): Seq[T] = {
+ table: TableImpl, sink: CollectTableSink[T], jobName: Option[String] = None,
+ builtInCatalogName: String = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG,
+ builtInDBName: String = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE): Seq[T] = {
// get schema information of table
val relNode = TableTestUtil.toRelNode(table)
val rowType = relNode.getRowType
@@ -71,7 +74,8 @@ object TableUtil {
val configuredSink = sink.configure(
fieldNames, fieldTypes.map(TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo))
BatchTableEnvUtil.collect(table.getTableEnvironment,
- table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName)
+ table, configuredSink.asInstanceOf[CollectTableSink[T]], jobName,
+ builtInCatalogName, builtInDBName)
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index eb8aa57..0201d77 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -953,8 +953,7 @@ class TestingTableEnvironment private(
}
override def insertInto(table: Table, path: String, pathContinued: String*): Unit = {
- val fullPath = List(path)
- fullPath.addAll(pathContinued)
+ val fullPath = List(path) ++ pathContinued.toList
val modifyOperations = List(new CatalogSinkModifyOperation(fullPath, table.getQueryOperation))
if (isEagerOperationTranslation) {