You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/15 16:31:09 UTC

[flink] branch master updated: [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d913de1  [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink
d913de1 is described below

commit d913de10094dc9d63f9a4599da033d89d25154d2
Author: Rui Li <li...@apache.org>
AuthorDate: Wed Jul 10 17:59:36 2019 +0800

    [FLINK-13069][hive] HiveTableSink should implement OverwritableTableSink
    
    This PR makes HiveTableSink implements OverwritableTableSink.
    
    This closes #9067.
---
 .../connectors/hive/HiveTableOutputFormat.java     |   4 +-
 .../flink/batch/connectors/hive/HiveTableSink.java |   9 +-
 .../connectors/hive/HiveTableOutputFormatTest.java | 177 ---------------------
 .../batch/connectors/hive/HiveTableSinkTest.java   |  77 +++++----
 4 files changed, 53 insertions(+), 214 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
index 898ab53..acb1bf9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
@@ -160,6 +160,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 		this.overwrite = overwrite;
 		isPartitioned = partitionColumns != null && !partitionColumns.isEmpty();
 		isDynamicPartition = isPartitioned && partitionColumns.size() > hiveTablePartition.getPartitionSpec().size();
+		hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
 	}
 
 	//  Custom serialization methods
@@ -175,6 +176,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 		out.writeObject(partitionColumns);
 		out.writeObject(tablePath);
 		out.writeObject(tableProperties);
+		out.writeObject(hiveVersion);
 	}
 
 	@SuppressWarnings("unchecked")
@@ -198,6 +200,7 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 		tablePath = (ObjectPath) in.readObject();
 		partitionToWriter = new HashMap<>();
 		tableProperties = (Properties) in.readObject();
+		hiveVersion = (String) in.readObject();
 	}
 
 	@Override
@@ -296,7 +299,6 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp
 				Arrays.asList(rowTypeInfo.getFieldNames()).subList(0, rowTypeInfo.getArity() - partitionColumns.size()),
 				objectInspectors);
 		}
-		hiveVersion = jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION, HiveShimLoader.getHiveVersion());
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
index ab84068..502f7a9 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableSink.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
+import org.apache.flink.table.sinks.OverwritableTableSink;
 import org.apache.flink.table.sinks.PartitionableTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
@@ -59,7 +60,7 @@ import java.util.stream.Collectors;
 /**
  * Table sink to write to Hive tables.
  */
-public class HiveTableSink extends OutputFormatTableSink<Row> implements PartitionableTableSink {
+public class HiveTableSink extends OutputFormatTableSink<Row> implements PartitionableTableSink, OverwritableTableSink {
 
 	private final JobConf jobConf;
 	private final CatalogTable catalogTable;
@@ -69,7 +70,6 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
 
 	private Map<String, String> staticPartitionSpec = Collections.emptyMap();
 
-	// TODO: need OverwritableTableSink to configure this
 	private boolean overwrite = false;
 
 	public HiveTableSink(JobConf jobConf, ObjectPath tablePath, CatalogTable table) {
@@ -201,4 +201,9 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti
 			}
 		}
 	}
+
+	@Override
+	public void setOverwrite(boolean overwrite) {
+		this.overwrite = overwrite;
+	}
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
deleted file mode 100644
index db072b4..0000000
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.batch.connectors.hive;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveTestUtils;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.IntStream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests {@link HiveTableOutputFormatTest}.
- */
-public class HiveTableOutputFormatTest {
-
-	private static HiveCatalog hiveCatalog;
-	private static HiveConf hiveConf;
-
-	@BeforeClass
-	public static void createCatalog() {
-		hiveConf = HiveTestUtils.createHiveConf();
-		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
-		hiveCatalog.open();
-	}
-
-	@AfterClass
-	public static void closeCatalog() {
-		if (hiveCatalog != null) {
-			hiveCatalog.close();
-		}
-	}
-
-	@Test
-	public void testInsertOverwrite() throws Exception {
-		String dbName = "default";
-		String tblName = "dest";
-		createDestTable(dbName, tblName, 0);
-		ObjectPath tablePath = new ObjectPath(dbName, tblName);
-		CatalogBaseTable table = hiveCatalog.getTable(tablePath);
-		Table hiveTable = hiveCatalog.getHiveTable(tablePath);
-
-		// write some data and verify
-		HiveTableOutputFormat outputFormat = createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable, null, false);
-		outputFormat.open(0, 1);
-		List<Row> toWrite = generateRecords(5);
-		writeRecords(toWrite, outputFormat);
-		outputFormat.close();
-		outputFormat.finalizeGlobal(1);
-		verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), toWrite, 0);
-
-		// write some data to overwrite existing data and verify
-		outputFormat = createHiveTableOutputFormat(tablePath, (CatalogTableImpl) table, hiveTable, null, true);
-		outputFormat.open(0, 1);
-		toWrite = generateRecords(3);
-		writeRecords(toWrite, outputFormat);
-		outputFormat.close();
-		outputFormat.finalizeGlobal(1);
-		verifyWrittenData(new Path(hiveTable.getSd().getLocation(), "0"), toWrite, 0);
-
-		hiveCatalog.dropTable(tablePath, false);
-	}
-
-	private void createDestTable(String dbName, String tblName, int numPartCols) throws Exception {
-		ObjectPath tablePath = new ObjectPath(dbName, tblName);
-		TableSchema tableSchema = new TableSchema(
-				new String[]{"i", "l", "d", "s"},
-				new TypeInformation[]{
-						BasicTypeInfo.INT_TYPE_INFO,
-						BasicTypeInfo.LONG_TYPE_INFO,
-						BasicTypeInfo.DOUBLE_TYPE_INFO,
-						BasicTypeInfo.STRING_TYPE_INFO}
-		);
-		CatalogTable catalogTable = createCatalogTable(tableSchema, numPartCols);
-		hiveCatalog.createTable(tablePath, catalogTable, false);
-	}
-
-	private CatalogTable createCatalogTable(TableSchema tableSchema, int numPartCols) {
-		if (numPartCols == 0) {
-			return new CatalogTableImpl(tableSchema, new HashMap<>(), "");
-		}
-		String[] partCols = new String[numPartCols];
-		System.arraycopy(tableSchema.getFieldNames(), tableSchema.getFieldNames().length - numPartCols, partCols, 0, numPartCols);
-		return new CatalogTableImpl(tableSchema, Arrays.asList(partCols), new HashMap<>(), "");
-	}
-
-	private HiveTableOutputFormat createHiveTableOutputFormat(ObjectPath tablePath, CatalogTable catalogTable, Table hiveTable,
-			Map<String, Object> partSpec, boolean overwrite) throws Exception {
-		StorageDescriptor jobSD = hiveTable.getSd().deepCopy();
-		jobSD.setLocation(hiveTable.getSd().getLocation() + "/.staging");
-		HiveTablePartition hiveTablePartition = new HiveTablePartition(jobSD, partSpec);
-		JobConf jobConf = new JobConf(hiveConf);
-		return new HiveTableOutputFormat(jobConf, tablePath, catalogTable, hiveTablePartition,
-			MetaStoreUtils.getTableMetadata(hiveTable), overwrite);
-	}
-
-	private void verifyWrittenData(Path outputFile, List<Row> expected, int numPartCols) throws Exception {
-		FileSystem fs = outputFile.getFileSystem(hiveConf);
-		assertTrue(fs.exists(outputFile));
-		int[] fields = IntStream.range(0, expected.get(0).getArity() - numPartCols).toArray();
-		try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(outputFile)))) {
-			int numWritten = 0;
-			String line = reader.readLine();
-			while (line != null) {
-				Row expectedRow = Row.project(expected.get(numWritten++), fields);
-				assertEquals(expectedRow.toString(), line.replaceAll("\u0001", ","));
-				line = reader.readLine();
-			}
-			reader.close();
-			assertEquals(expected.size(), numWritten);
-		}
-	}
-
-	private void writeRecords(List<Row> toWrite, HiveTableOutputFormat outputFormat) throws IOException {
-		for (Row row : toWrite) {
-			outputFormat.writeRecord(row);
-		}
-	}
-
-	private List<Row> generateRecords(int numRecords) {
-		int arity = 4;
-		List<Row> res = new ArrayList<>(numRecords);
-		for (int i = 0; i < numRecords; i++) {
-			Row row = new Row(arity);
-			row.setField(0, i);
-			row.setField(1, (long) i);
-			row.setField(2, Double.valueOf(String.valueOf(String.format("%d.%d", i, i))));
-			row.setField(3, String.valueOf((char) ('a' + i)));
-			res.add(row);
-		}
-		return res;
-	}
-}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index 095170b..fe54eac 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -23,21 +23,17 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
@@ -45,18 +41,14 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Tests {@link HiveTableSink}.
@@ -101,11 +93,7 @@ public class HiveTableSinkTest {
 		tableEnv.sqlQuery("select * from src").insertInto("destSink");
 		execEnv.execute();
 
-		List<String> result = hiveShell.executeQuery("select * from " + tblName);
-		assertEquals(toWrite.size(), result.size());
-		for (int i = 0; i < result.size(); i++) {
-			assertEquals(toWrite.get(i).toString().replaceAll(",", "\t"), result.get(i));
-		}
+		verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
 
 		hiveCatalog.dropTable(tablePath, false);
 	}
@@ -130,10 +118,7 @@ public class HiveTableSinkTest {
 		List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath);
 		assertEquals(toWrite.size(), partitionSpecs.size());
 
-		List<String> result = hiveShell.executeQuery("select * from " + tblName);
-		for (int i = 0; i < result.size(); i++) {
-			assertEquals(toWrite.get(i).toString().replaceAll(",", "\t"), result.get(i));
-		}
+		verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
 
 		hiveCatalog.dropTable(tablePath, false);
 	}
@@ -235,10 +220,44 @@ public class HiveTableSinkTest {
 
 		// make sure new partition is created
 		assertEquals(toWrite.size(), hiveCatalog.listPartitions(tablePath).size());
-		CatalogPartition catalogPartition = hiveCatalog.getPartition(tablePath, new CatalogPartitionSpec(partSpec));
 
-		String partitionLocation = catalogPartition.getProperties().get(HiveCatalogConfig.PARTITION_LOCATION);
-		verifyWrittenData(new Path(partitionLocation, "0"), toWrite, 1);
+		verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
+
+		hiveCatalog.dropTable(tablePath, false);
+	}
+
+	@Test
+	public void testInsertOverwrite() throws Exception {
+		String dbName = "default";
+		String tblName = "dest";
+		RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 0);
+		ObjectPath tablePath = new ObjectPath(dbName, tblName);
+
+		ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
+		BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv);
+
+		// write some data and verify
+		List<Row> toWrite = generateRecords(5);
+		tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo));
+
+		CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath);
+		tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table));
+		tableEnv.sqlQuery("select * from src").insertInto("destSink");
+		execEnv.execute();
+
+		verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
+
+		// write some data to overwrite existing data and verify
+		toWrite = generateRecords(3);
+		tableEnv.registerDataSet("src1", execEnv.fromCollection(toWrite, rowTypeInfo));
+
+		HiveTableSink sink = new HiveTableSink(new JobConf(hiveConf), tablePath, table);
+		sink.setOverwrite(true);
+		tableEnv.registerTableSink("destSink1", sink);
+		tableEnv.sqlQuery("select * from src1").insertInto("destSink1");
+		execEnv.execute();
+
+		verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName));
 
 		hiveCatalog.dropTable(tablePath, false);
 	}
@@ -269,20 +288,10 @@ public class HiveTableSinkTest {
 		return new CatalogTableImpl(tableSchema, Arrays.asList(partCols), new HashMap<>(), "");
 	}
 
-	private void verifyWrittenData(org.apache.hadoop.fs.Path outputFile, List<Row> expected, int numPartCols) throws Exception {
-		FileSystem fs = outputFile.getFileSystem(hiveConf);
-		assertTrue(fs.exists(outputFile));
-		int[] fields = IntStream.range(0, expected.get(0).getArity() - numPartCols).toArray();
-		try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(outputFile)))) {
-			int numWritten = 0;
-			String line = reader.readLine();
-			while (line != null) {
-				Row expectedRow = Row.project(expected.get(numWritten++), fields);
-				assertEquals(expectedRow.toString(), line.replaceAll("\u0001", ","));
-				line = reader.readLine();
-			}
-			reader.close();
-			assertEquals(expected.size(), numWritten);
+	private void verifyWrittenData(List<Row> expected, List<String> results) throws Exception {
+		assertEquals(expected.size(), results.size());
+		for (int i = 0; i < results.size(); i++) {
+			assertEquals(expected.get(i).toString().replaceAll(",", "\t"), results.get(i));
 		}
 	}