You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/15 16:31:09 UTC
[flink] branch master updated: [FLINK-13069][hive] HiveTableSink
should implement OverwritableTableSink
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d913de1 [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink
d913de1 is described below
commit d913de10094dc9d63f9a4599da033d89d25154d2
Author: Rui Li <li...@apache.org>
AuthorDate: Wed Jul 10 17:59:36 2019 +0800
[FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink
This PR makes HiveTableSink implements OverwritableTableSink.
This closes #9067.
---
.../connectors/hive/HiveTableOutputFormat.java | 4 +-
.../flink/batch/connectors/hive/HiveTableSink.java | 9 +-
.../connectors/hive/HiveTableOutputFormatTest.java | 177 ---------------------
.../batch/connectors/hive/HiveTableSinkTest.java | 77 +++++----
4 files changed, 53 insertions(+), 214 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index 898ab53..acb1bf9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -160,6 +160,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
this.overwrite = overwrite;
isPartitioned = partitionColumns != null && !partitionColumns.isEmpty();
isDynamicPartition = isPartitioned && partitionColumns.size() > hiveTablePartition.getPartitionSpec().size();
+ hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
}
// Custom serialization methods
@@ -175,6 +176,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
out.writeObject(partitionColumns);
out.writeObject(tablePath);
out.writeObject(tableProperties);
+ out.writeObject(hiveVersion);
}
@SuppressWarnings("unchecked")
@@ -198,6 +200,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
tablePath = (ObjectPath) in.readObject();
partitionToWriter = new HashMap<>();
tableProperties = (Properties) in.readObject();
+ hiveVersion = (String) in.readObject();
}
@Override
@@ -296,7 +299,6 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - partitionColumns.size()),
objectInspectors);
}
- hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
index ab84068..502f7a9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.sinks.OutputFormatTableSink;
+import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
@@ -59,7 +60,7 @@ import java.util.stream.Collectors;
/**
* Table sink to write to Hive tables.
*/
-public class HiveTableSink extends OutputFormatTableSink<Row> implements PartitionableTableSink {
+public class HiveTableSink extends OutputFormatTableSink<Row> implements PartitionableTableSink, OverwritableTableSink {
private final JobConf jobConf;
private final CatalogTable catalogTable;
@@ -69,7 +70,6 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
private Map<String, String> staticPartitionSpec = Collections.emptyMap();
- // TODO: need OverwritableTableSink to configure this
private boolean overwrite = false;
public HiveTableSink(JobConf jobConf, ObjectPath tablePath, CatalogTable table) {
@@ -201,4 +201,9 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
}
}
}
+
+ @Override
+ public void setOverwrite(boolean overwrite) {
+ this.overwrite = overwrite;
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
deleted file mode 100644
index db072b4..0000000
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.hive;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.IntStream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests {@link HiveTableOutputFormatTest}.
- */
-public class HiveTableOutputFormatTest {
-
- private static HiveCatalog hiveCatalog;
- private static HiveConf hiveConf;
-
- @BeforeClass
- public static void createCatalog() {
- hiveConf = HiveTestUtils.createHiveConf();
- hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
- hiveCatalog.open();
- }
-
- @AfterClass
- public static void closeCatalog() {
- if (hiveCatalog != null) {
- hiveCatalog.close();
- }
- }
-
- @Test
- public void testInsertOverwrite() throws Exception {
- String dbName = "default";
- String tblName = "dest";
- createDestTable(dbName, tblName, 0);
- ObjectPath tablePath = new ObjectPath(dbName, tblName);
- CatalogBaseTable table = hiveCatalog.getTable(tablePath);
- Table hiveTable = hiveCatalog.getHiveTable(tablePath);
-
- // write some data and verify
- HiveTableOutputFormat outputFormat = createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable, null, false);
- outputFormat.open(0, 1);
- List<Row> toWrite = generateRecords(5);
- writeRecords(toWrite, outputFormat);
- outputFormat.close();
- outputFormat.finalizeGlobal(1);
- verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), toWrite, 0);
-
- // write some data to overwrite existing data and verify
- outputFormat = createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable, null, true);
- outputFormat.open(0, 1);
- toWrite = generateRecords(3);
- writeRecords(toWrite, outputFormat);
- outputFormat.close();
- outputFormat.finalizeGlobal(1);
- verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), toWrite, 0);
-
- hiveCatalog.dropTable(tablePath, false);
- }
-
- private void createDestTable(String dbName, String tblName, int numPartCols) throws Exception {
- ObjectPath tablePath = new ObjectPath(dbName, tblName);
- TableSchema tableSchema = new TableSchema(
- new String[]{"i", "l", "d", "s"},
- new TypeInformation[]{
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO}
- );
- CatalogTable catalogTable = createCatalogTable(tableSchema, numPartCols);
- hiveCatalog.createTable(tablePath, catalogTable, false);
- }
-
- private CatalogTable createCatalogTable(TableSchema tableSchema, int numPartCols) {
- if (numPartCols == 0) {
- return new CatalogTableImpl(tableSchema, new HashMap<>(), "");
- }
- String[] partCols = new String[numPartCols];
- System.arraycopy(tableSchema.getFieldNames(), tableSchema.getFieldNames().length - numPartCols, partCols, 0, numPartCols);
- return new CatalogTableImpl(tableSchema, Arrays.asList(partCols), new HashMap<>(), "");
- }
-
- private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath tablePath, CatalogTable catalogTable, Table hiveTable,
- Map<String, Object> partSpec, boolean overwrite) throws Exception {
- StorageDescriptor jobSD = hiveTable.getSd().deepCopy();
- jobSD.setLocation(hiveTable.getSd().getLocation() + "/.staging");
- HiveTablePartition hiveTablePartition = new HiveTablePartition(jobSD, partSpec);
- JobConf jobConf = new JobConf(hiveConf);
- return new HiveTableOutputFormat(jobConf, tablePath, catalogTable, hiveTablePartition,
- MetaStoreUtils.getTableMetadata(hiveTable), overwrite);
- }
-
- private void verifyWrittenData(Path outputFile, List<Row> expected, int numPartCols) throws Exception {
- FileSystem fs = outputFile.getFileSystem(hiveConf);
- assertTrue(fs.exists(outputFile));
- int[] fields = IntStream.range(0, expected.get(0).getArity() - numPartCols).toArray();
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(outputFile)))) {
- int numWritten = 0;
- String line = reader.readLine();
- while (line != null) {
- Row expectedRow = Row.project(expected.get(numWritten++), fields);
- assertEquals(expectedRow.toString(), line.replaceAll("\u0001", ","));
- line = reader.readLine();
- }
- reader.close();
- assertEquals(expected.size(), numWritten);
- }
- }
-
- private void writeRecords(List<Row> toWrite, HiveTableOutputFormat outputFormat) throws IOException {
- for (Row row : toWrite) {
- outputFormat.writeRecord(row);
- }
- }
-
- private List<Row> generateRecords(int numRecords) {
- int arity = 4;
- List<Row> res = new ArrayList<>(numRecords);
- for (int i = 0; i < numRecords; i++) {
- Row row = new Row(arity);
- row.setField(0, i);
- row.setField(1, (long) i);
- row.setField(2, Double.valueOf(String.valueOf(String.format("%d.%d", i, i))));
- row.setField(3, String.valueOf((char) ('a' + i)));
- res.add(row);
- }
- return res;
- }
-}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index 095170b..fe54eac 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -23,21 +23,17 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
@@ -45,18 +41,14 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/**
* Tests {@link HiveTableSink}.
@@ -101,11 +93,7 @@ public class HiveTableSinkTest {
tableEnv.sqlQuery("select * from src").insertInto("destSink");
execEnv.execute();
- List<String> result = hiveShell.executeQuery("select * from " + tblName);
- assertEquals(toWrite.size(), result.size());
- for (int i = 0; i < result.size(); i++) {
- assertEquals(toWrite.get(i).toString().replaceAll(",", "\t"), result.get(i));
- }
+ verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
hiveCatalog.dropTable(tablePath, false);
}
@@ -130,10 +118,7 @@ public class HiveTableSinkTest {
List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
assertEquals(toWrite.size(), partitionSpecs.size());
- List<String> result = hiveShell.executeQuery("select * from " + tblName);
- for (int i = 0; i < result.size(); i++) {
- assertEquals(toWrite.get(i).toString().replaceAll(",", "\t"), result.get(i));
- }
+ verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
hiveCatalog.dropTable(tablePath, false);
}
@@ -235,10 +220,44 @@ public class HiveTableSinkTest {
// make sure new partition is created
assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
- CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(partSpec));
- String partitionLocation = catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
- verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
+ verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
+
+ hiveCatalog.dropTable(tablePath, false);
+ }
+
+ @Test
+ public void testInsertOverwrite() throws Exception {
+ String dbName = "default";
+ String tblName = "dest";
+ RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
+ ObjectPath tablePath = new ObjectPath(dbName, tblName);
+
+ ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
+ BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+
+ // write some data and verify
+ List<Row> toWrite = generateRecords(5);
+ tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+
+ CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
+ tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
+ tableEnv.sqlQuery("select * from src").insertInto("destSink");
+ execEnv.execute();
+
+ verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
+
+ // write some data to overwrite existing data and verify
+ toWrite = generateRecords(3);
+ tableEnv.registerDataSet("src1", execEnv.fromCollection(toWrite, rowTypeInfo));
+
+ HiveTableSink sink = new HiveTableSink(new JobConf(hiveConf), tablePath, table);
+ sink.setOverwrite(true);
+ tableEnv.registerTableSink("destSink1", sink);
+ tableEnv.sqlQuery("select * from src1").insertInto("destSink1");
+ execEnv.execute();
+
+ verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
hiveCatalog.dropTable(tablePath, false);
}
@@ -269,20 +288,10 @@ public class HiveTableSinkTest {
return new CatalogTableImpl(tableSchema, Arrays.asList(partCols), new HashMap<>(), "");
}
- private void verifyWrittenData(org.apache.hadoop.fs.Path outputFile, List<Row> expected, int numPartCols) throws Exception {
- FileSystem fs = outputFile.getFileSystem(hiveConf);
- assertTrue(fs.exists(outputFile));
- int[] fields = IntStream.range(0, expected.get(0).getArity() - numPartCols).toArray();
- try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(outputFile)))) {
- int numWritten = 0;
- String line = reader.readLine();
- while (line != null) {
- Row expectedRow = Row.project(expected.get(numWritten++), fields);
- assertEquals(expectedRow.toString(), line.replaceAll("\u0001", ","));
- line = reader.readLine();
- }
- reader.close();
- assertEquals(expected.size(), numWritten);
+ private void verifyWrittenData(List<Row> expected, List<String> results) throws Exception {
+ assertEquals(expected.size(), results.size());
+ for (int i = 0; i < results.size(); i++) {
+ assertEquals(expected.get(i).toString().replaceAll(",", "\t"), results.get(i));
}
}