You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/15 08:17:59 UTC
[flink] branch release-1.11 updated: [FLINK-18197][hive] Add more
logs for hive streaming integration
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 26af028 [FLINK-18197][hive] Add more logs for hive streaming integration
26af028 is described below
commit 26af028d742f6ae54d62fc40c3e240270a0aadbc
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jun 15 16:13:27 2020 +0800
[FLINK-18197][hive] Add more logs for hive streaming integration
This closes #12625
---
.../hive/read/HiveContinuousMonitoringFunction.java | 2 ++
.../flink/table/filesystem/FileSystemLookupFunction.java | 13 +++++++++++++
.../flink/table/filesystem/MetastoreCommitPolicy.java | 11 +++++++----
.../flink/table/filesystem/PartitionCommitPolicy.java | 12 ++++++++++++
.../flink/table/filesystem/SuccessFileCommitPolicy.java | 6 ++++++
.../table/filesystem/stream/StreamingFileCommitter.java | 1 +
6 files changed, 41 insertions(+), 4 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
index 893e880..dff6580 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
@@ -293,6 +293,8 @@ public class HiveContinuousMonitoringFunction
if (timestamp > maxTimestamp) {
maxTimestamp = timestamp;
}
+ LOG.info("Found new partition {} of table {}, forwarding splits to downstream readers",
+ partSpec, tablePath.getFullName());
HiveTableInputSplit[] splits = HiveTableInputFormat.createInputSplits(
this.readerParallelism,
Collections.singletonList(toHiveTablePartition(partition)),
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
index 7219d49..e3bed30 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
@@ -37,6 +37,9 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@@ -54,6 +57,8 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(FileSystemLookupFunction.class);
+
private final InputFormat<RowData, T> inputFormat;
// names and types of the records returned by the input format
private final String[] producedNames;
@@ -133,14 +138,21 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
if (nextLoadTime > System.currentTimeMillis()) {
return;
}
+ if (nextLoadTime > 0) {
+ LOG.info("Lookup join cache has expired after {} minute(s), reloading", getCacheTTL().toMinutes());
+ } else {
+ LOG.info("Populating lookup join cache");
+ }
cache.clear();
try {
T[] inputSplits = inputFormat.createInputSplits(1);
GenericRowData reuse = new GenericRowData(producedNames.length);
+ long count = 0;
for (T split : inputSplits) {
inputFormat.open(split);
while (!inputFormat.reachedEnd()) {
RowData row = inputFormat.nextRecord(reuse);
+ count++;
Row key = extractKey(row);
List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
rows.add(serializer.copy(row));
@@ -148,6 +160,7 @@ public class FileSystemLookupFunction<T extends InputSplit> extends TableFunctio
inputFormat.close();
}
nextLoadTime = System.currentTimeMillis() + getCacheTTL().toMillis();
+ LOG.info("Loaded {} row(s) into lookup join cache", count);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to load table into cache", e);
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
index 46f5d3c..cd56e9c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
@@ -20,6 +20,9 @@ package org.apache.flink.table.filesystem;
import org.apache.flink.table.filesystem.TableMetaStoreFactory.TableMetaStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.LinkedHashMap;
/**
@@ -30,6 +33,8 @@ import java.util.LinkedHashMap;
*/
public class MetastoreCommitPolicy implements PartitionCommitPolicy {
+ private static final Logger LOG = LoggerFactory.getLogger(MetastoreCommitPolicy.class);
+
private TableMetaStore metaStore;
public void setMetastore(TableMetaStore metaStore) {
@@ -38,10 +43,8 @@ public class MetastoreCommitPolicy implements PartitionCommitPolicy {
@Override
public void commit(Context context) throws Exception {
- LinkedHashMap<String, String> partitionSpec = new LinkedHashMap<>();
- for (int i = 0; i < context.partitionKeys().size(); i++) {
- partitionSpec.put(context.partitionKeys().get(i), context.partitionValues().get(i));
- }
+ LinkedHashMap<String, String> partitionSpec = context.partitionSpec();
metaStore.createOrAlterPartition(partitionSpec, context.partitionPath());
+ LOG.info("Committed partition {} to metastore", partitionSpec);
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
index d2c2c7c..8b2cc1c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.ValidationException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -90,6 +91,17 @@ public interface PartitionCommitPolicy {
* Path of this partition.
*/
Path partitionPath();
+
+ /**
+ * Partition spec in the form of a map from partition keys to values.
+ */
+ default LinkedHashMap<String, String> partitionSpec() {
+ LinkedHashMap<String, String> res = new LinkedHashMap<>();
+ for (int i = 0; i < partitionKeys().size(); i++) {
+ res.put(partitionKeys().get(i), partitionValues().get(i));
+ }
+ return res;
+ }
}
/**
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
index 4472e54..94d6a46 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
@@ -21,12 +21,17 @@ package org.apache.flink.table.filesystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Partition commit policy to add success file to directory. Success file is configurable and
* empty file.
*/
public class SuccessFileCommitPolicy implements PartitionCommitPolicy {
+ private static final Logger LOG = LoggerFactory.getLogger(SuccessFileCommitPolicy.class);
+
private final String fileName;
private final FileSystem fileSystem;
@@ -40,5 +45,6 @@ public class SuccessFileCommitPolicy implements PartitionCommitPolicy {
fileSystem.create(
new Path(context.partitionPath(), fileName),
FileSystem.WriteMode.OVERWRITE).close();
+ LOG.info("Committed partition {} with success file", context.partitionSpec());
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
index ec63faf..526970e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
@@ -156,6 +156,7 @@ public class StreamingFileCommitter extends AbstractStreamOperator<Void>
try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {
for (String partition : partitions) {
LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition));
+ LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);
Path path = new Path(locationPath, generatePartitionPath(partSpec));
PartitionCommitPolicy.Context context = new PolicyContext(
new ArrayList<>(partSpec.values()), path);