You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/15 08:17:59 UTC

[flink] branch release-1.11 updated: [FLINK-18197][hive] Add more logs for hive streaming integration

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 26af028  [FLINK-18197][hive] Add more logs for hive streaming integration
26af028 is described below

commit 26af028d742f6ae54d62fc40c3e240270a0aadbc
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jun 15 16:13:27 2020 +0800

    [FLINK-18197][hive] Add more logs for hive streaming integration
    
    This closes #12625
---
 .../hive/read/HiveContinuousMonitoringFunction.java         |  2 ++
 .../flink/table/filesystem/FileSystemLookupFunction.java    | 13 +++++++++++++
 .../flink/table/filesystem/MetastoreCommitPolicy.java       | 11 +++++++----
 .../flink/table/filesystem/PartitionCommitPolicy.java       | 12 ++++++++++++
 .../flink/table/filesystem/SuccessFileCommitPolicy.java     |  6 ++++++
 .../table/filesystem/stream/StreamingFileCommitter.java     |  1 +
 6 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
index 893e880..dff6580 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
@@ -293,6 +293,8 @@ public class HiveContinuousMonitoringFunction
 				if (timestamp > maxTimestamp) {
 					maxTimestamp = timestamp;
 				}
+				LOG.info("Found new partition {} of table {}, forwarding splits to downstream readers",
+						partSpec, tablePath.getFullName());
 				HiveTableInputSplit[] splits = HiveTableInputFormat.createInputSplits(
 						this.readerParallelism,
 						Collections.singletonList(toHiveTablePartition(partition)),
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
index 7219d49..e3bed30 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
@@ -37,6 +37,9 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -54,6 +57,8 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
 
 	private static final long serialVersionUID = 1L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
+
 	private final InputFormat<RowData, T> inputFormat;
 	// names and types of the records returned by the input format
 	private final String[] producedNames;
@@ -133,14 +138,21 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
 		if (nextLoadTime > System.currentTimeMillis()) {
 			return;
 		}
+		if (nextLoadTime > 0) {
+			LOG.info("Lookup join cache has expired after {} minute(s), reloading", getCacheTTL().toMinutes());
+		} else {
+			LOG.info("Populating lookup join cache");
+		}
 		cache.clear();
 		try {
 			T[] inputSplits = inputFormat.createInputSplits(1);
 			GenericRowData reuse = new GenericRowData(producedNames.length);
+			long count = 0;
 			for (T split : inputSplits) {
 				inputFormat.open(split);
 				while (!inputFormat.reachedEnd()) {
 					RowData row = inputFormat.nextRecord(reuse);
+					count++;
 					Row key = extractKey(row);
 					List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
 					rows.add(serializer.copy(row));
@@ -148,6 +160,7 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
 				inputFormat.close();
 			}
 			nextLoadTime = System.currentTimeMillis() + getCacheTTL().toMillis();
+			LOG.info("Loaded {} row(s) into lookup join cache", count);
 		} catch (IOException e) {
 			throw new FlinkRuntimeException("Failed to load table into cache", e);
 		}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
index 46f5d3c..cd56e9c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
@@ -20,6 +20,9 @@ package org.apache.flink.table.filesystem;
 
 import org.apache.flink.table.filesystem.TableMetaStoreFactory.TableMetaStore;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.LinkedHashMap;
 
 /**
@@ -30,6 +33,8 @@ import java.util.LinkedHashMap;
  */
 public class MetastoreCommitPolicy implements PartitionCommitPolicy {
 
+	private static final Logger LOG = LoggerFactory.getLogger(MetastoreCommitPolicy.class);
+
 	private TableMetaStore metaStore;
 
 	public void setMetastore(TableMetaStore metaStore) {
@@ -38,10 +43,8 @@ public class MetastoreCommitPolicy implements PartitionCommitPolicy {
 
 	@Override
 	public void commit(Context context) throws Exception {
-		LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
-		for (int i = 0; i < context.partitionKeys().size(); i++) {
-			partitionSpec.put(context.partitionKeys().get(i), context.partitionValues().get(i));
-		}
+		LinkedHashMap<String, String> partitionSpec = context.partitionSpec();
 		metaStore.createOrAlterPartition(partitionSpec, context.partitionPath());
+		LOG.info("Committed partition {} to metastore", partitionSpec);
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
index d2c2c7c..8b2cc1c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.ValidationException;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -90,6 +91,17 @@ public interface PartitionCommitPolicy {
 		 * Path of this partition.
 		 */
 		Path partitionPath();
+
+		/**
+		 * Partition spec in the form of a map from partition keys to values.
+		 */
+		default LinkedHashMap<String, String> partitionSpec() {
+			LinkedHashMap<String, String> res = new LinkedHashMap<>();
+			for (int i = 0; i < partitionKeys().size(); i++) {
+				res.put(partitionKeys().get(i), partitionValues().get(i));
+			}
+			return res;
+		}
 	}
 
 	/**
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
index 4472e54..94d6a46 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
@@ -21,12 +21,17 @@ package org.apache.flink.table.filesystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Partition commit policy to add success file to directory. Success file is configurable and
  * empty file.
  */
 public class SuccessFileCommitPolicy implements PartitionCommitPolicy {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SuccessFileCommitPolicy.class);
+
 	private final String fileName;
 	private final FileSystem fileSystem;
 
@@ -40,5 +45,6 @@ public class SuccessFileCommitPolicy implements PartitionCommitPolicy {
 		fileSystem.create(
 				new Path(context.partitionPath(), fileName),
 				FileSystem.WriteMode.OVERWRITE).close();
+		LOG.info("Committed partition {} with success file", context.partitionSpec());
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
index ec63faf..526970e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
@@ -156,6 +156,7 @@ public class StreamingFileCommitter extends AbstractStreamOperator<Void>
 		try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {
 			for (String partition : partitions) {
 				LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));
+				LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);
 				Path path = new Path(locationPath, generatePartitionPath(partSpec));
 				PartitionCommitPolicy.Context context = new PolicyContext(
 						new ArrayList<>(partSpec.values()), path);