You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/09 18:55:25 UTC

[flink] branch master updated: [hotfix][hive] resolve bad merge in HiveTableSinkTest

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fac54e9  [hotfix][hive] resolve bad merge in HiveTableSinkTest
fac54e9 is described below

commit fac54e945897c24c32bbfbd3ec0b7b6832309131
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue Jul 9 11:54:09 2019 -0700

    [hotfix][hive] resolve bad merge in HiveTableSinkTest
    
    Resolve bad merge between #8965 and #8987 though they passed CI separately.
---
 .../connectors/hive/HiveTableOutputFormatTest.java |  2 +-
 .../batch/connectors/hive/HiveTableSinkTest.java   | 25 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
index 1d9a662..db072b4 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormatTest.java
@@ -62,7 +62,7 @@ public class HiveTableOutputFormatTest {
 	private static HiveConf hiveConf;
 
 	@BeforeClass
-	public static void createCatalog() throws IOException {
+	public static void createCatalog() {
 		hiveConf = HiveTestUtils.createHiveConf();
 		hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
 		hiveCatalog.open();
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index e31f92f..095170b 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -23,17 +23,21 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveCatalogConfig;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.AfterClass;
@@ -41,14 +45,18 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests {@link HiveTableSink}.
@@ -261,6 +269,23 @@ public class HiveTableSinkTest {
 		return new CatalogTableImpl(tableSchema, Arrays.asList(partCols), new HashMap<>(), "");
 	}
 
+	private void verifyWrittenData(org.apache.hadoop.fs.Path outputFile, List<Row> expected, int numPartCols) throws Exception {
+		FileSystem fs = outputFile.getFileSystem(hiveConf);
+		assertTrue(fs.exists(outputFile));
+		int[] fields = IntStream.range(0, expected.get(0).getArity() - numPartCols).toArray();
+		try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(outputFile)))) {
+			int numWritten = 0;
+			String line = reader.readLine();
+			while (line != null) {
+				Row expectedRow = Row.project(expected.get(numWritten++), fields);
+				assertEquals(expectedRow.toString(), line.replaceAll("\u0001", ","));
+				line = reader.readLine();
+			}
+			reader.close();
+			assertEquals(expected.size(), numWritten);
+		}
+	}
+
 	private List<Row> generateRecords(int numRecords) {
 		int arity = 4;
 		List<Row> res = new ArrayList<>(numRecords);