You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2021/05/20 14:36:32 UTC

[tez] branch master updated: TEZ-4305: Check StreamCapabilities before using HFLUSH from ProtoMessageWriter (#120) (Kishen Das reviewed by Laszlo Bodor and Harish JP)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d37914  TEZ-4305: Check StreamCapabilities before using HFLUSH from ProtoMessageWriter (#120) (Kishen Das reviewed by Laszlo Bodor and Harish JP)
4d37914 is described below

commit 4d379149bcdd0c0097f366d9a01e6a9907410ac6
Author: Kishen Das <ki...@gmail.com>
AuthorDate: Thu May 20 07:36:20 2021 -0700

    TEZ-4305: Check StreamCapabilities before using HFLUSH from ProtoMessageWriter (#120) (Kishen Das reviewed by Laszlo Bodor and Harish JP)
    
    Co-authored-by: Kishen Das <ki...@cloudera.com>
---
 .../java/org/apache/tez/common/StreamHelper.java   | 49 ++++++++++++++++++++++
 .../logging/impl/SimpleHistoryLoggingService.java  |  3 +-
 .../history/logging/proto/ProtoMessageWriter.java  |  3 +-
 3 files changed, 53 insertions(+), 2 deletions(-)

diff --git a/tez-common/src/main/java/org/apache/tez/common/StreamHelper.java b/tez-common/src/main/java/org/apache/tez/common/StreamHelper.java
new file mode 100644
index 0000000..789d9b4
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/StreamHelper.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public final class StreamHelper {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StreamHelper.class);
+
+  private StreamHelper() {
+  }
+
+  public static void hflushIfSupported(Syncable syncable) throws IOException {
+    if (syncable instanceof StreamCapabilities) {
+      if (((StreamCapabilities) syncable).hasCapability(StreamCapabilities.HFLUSH)) {
+        syncable.hflush();
+      } else {
+        // it would be no-op, if hflush is not supported by a given writer.
+        LOG.debug("skipping hflush, since the writer doesn't support it");
+      }
+    } else {
+      // this is done for backward compatibility in order to make it work with
+      // older versions of Hadoop.
+      syncable.hflush();
+    }
+  }
+}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 4372d8e..418bc3c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -23,6 +23,7 @@ import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.common.StreamHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -130,7 +131,7 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
     }
     try {
       if (outputStream != null) {
-        outputStream.hflush();
+        StreamHelper.hflushIfSupported(outputStream);
         outputStream.close();
       }
     } catch (IOException ioe) {
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
index 869b603..5b7591b 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.SequenceFile.Writer;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
+import org.apache.tez.common.StreamHelper;
 
 public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
   private final Path filePath;
@@ -61,7 +62,7 @@ public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
   }
 
   public void hflush() throws IOException {
-    writer.hflush();
+    StreamHelper.hflushIfSupported(writer);
   }
 
   @Override