You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2021/05/20 14:36:32 UTC
[tez] branch master updated: TEZ-4305: Check StreamCapabilities
before using HFLUSH from ProtoMessageWriter (#120) (Kishen Das reviewed by
Laszlo Bodor and Harish JP)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 4d37914 TEZ-4305: Check StreamCapabilities before using HFLUSH from ProtoMessageWriter (#120) (Kishen Das reviewed by Laszlo Bodor and Harish JP)
4d37914 is described below
commit 4d379149bcdd0c0097f366d9a01e6a9907410ac6
Author: Kishen Das <ki...@gmail.com>
AuthorDate: Thu May 20 07:36:20 2021 -0700
TEZ-4305: Check StreamCapabilities before using HFLUSH from ProtoMessageWriter (#120) (Kishen Das reviewed by Laszlo Bodor and Harish JP)
Co-authored-by: Kishen Das <ki...@cloudera.com>
---
.../java/org/apache/tez/common/StreamHelper.java | 49 ++++++++++++++++++++++
.../logging/impl/SimpleHistoryLoggingService.java | 3 +-
.../history/logging/proto/ProtoMessageWriter.java | 3 +-
3 files changed, 53 insertions(+), 2 deletions(-)
diff --git a/tez-common/src/main/java/org/apache/tez/common/StreamHelper.java b/tez-common/src/main/java/org/apache/tez/common/StreamHelper.java
new file mode 100644
index 0000000..789d9b4
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/StreamHelper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public final class StreamHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamHelper.class);
+
+ private StreamHelper() {
+ }
+
+ public static void hflushIfSupported(Syncable syncable) throws IOException {
+ if (syncable instanceof StreamCapabilities) {
+ if (((StreamCapabilities) syncable).hasCapability(StreamCapabilities.HFLUSH)) {
+ syncable.hflush();
+ } else {
+ // it would be no-op, if hflush is not supported by a given writer.
+ LOG.debug("skipping hflush, since the writer doesn't support it");
+ }
+ } else {
+ // this is done for backward compatibility in order to make it work with
+ // older versions of Hadoop.
+ syncable.hflush();
+ }
+ }
+}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 4372d8e..418bc3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -23,6 +23,7 @@ import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.tez.common.StreamHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -130,7 +131,7 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
}
try {
if (outputStream != null) {
- outputStream.hflush();
+ StreamHelper.hflushIfSupported(outputStream);
outputStream.close();
}
} catch (IOException ioe) {
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
index 869b603..5b7591b 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.SequenceFile.Writer;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
+import org.apache.tez.common.StreamHelper;
public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
private final Path filePath;
@@ -61,7 +62,7 @@ public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
}
public void hflush() throws IOException {
- writer.hflush();
+ StreamHelper.hflushIfSupported(writer);
}
@Override