You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2021/05/04 08:59:56 UTC
[hadoop] branch branch-3.3 updated: HADOOP-17657: implement
StreamCapabilities in SequenceFile.Writer and fall back to flush,
if hflush is not supported (#2949)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 98aa4fc HADOOP-17657: implement StreamCapabilities in SequenceFile.Writer and fall back to flush, if hflush is not supported (#2949)
98aa4fc is described below
commit 98aa4fc32c0dfede20f57a7a05297a9e22d2f7d7
Author: kishendas <ki...@gmail.com>
AuthorDate: Tue May 4 01:20:56 2021 -0700
HADOOP-17657: implement StreamCapabilities in SequenceFile.Writer and fall back to flush, if hflush is not supported (#2949)
Co-authored-by: Kishen Das <ki...@cloudera.com>
Reviewed-by: Steve Loughran <st...@apache.org>
(cherry picked from commit e571025f5b371ade25d1457f0186ba656bb71c5f)
---
.../java/org/apache/hadoop/io/SequenceFile.java | 19 +++++++++++++++-
.../org/apache/hadoop/io/TestSequenceFile.java | 26 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 3f4649f..0581fb3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -27,6 +27,7 @@ import java.security.MessageDigest;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -834,7 +835,8 @@ public class SequenceFile {
}
/** Write key/value pairs to a sequence-format file. */
- public static class Writer implements java.io.Closeable, Syncable {
+ public static class Writer implements java.io.Closeable, Syncable,
+ Flushable, StreamCapabilities {
private Configuration conf;
FSDataOutputStream out;
boolean ownOutputStream = true;
@@ -1367,6 +1369,21 @@ public class SequenceFile {
out.hflush();
}
}
+
+ @Override
+ public void flush() throws IOException {
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ @Override
+ public boolean hasCapability(String capability) {
+ if (out !=null && capability != null) {
+ return out.hasCapability(capability);
+ }
+ return false;
+ }
/** Returns the configuration of this file. */
Configuration getConf() { return conf; }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java
index 0448243..5e4d578 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFile.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.conf.*;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -724,6 +725,31 @@ public class TestSequenceFile {
}
}
+ @Test
+ public void testSequenceFileWriter() throws Exception {
+ Configuration conf = new Configuration();
+ // This test only works with Raw File System and not Local File System
+ FileSystem fs = FileSystem.getLocal(conf).getRaw();
+ Path p = new Path(GenericTestUtils
+ .getTempPath("testSequenceFileWriter.seq"));
+ try(SequenceFile.Writer writer = SequenceFile.createWriter(
+ fs, conf, p, LongWritable.class, Text.class)) {
+ Assertions.assertThat(writer.hasCapability
+ (StreamCapabilities.HSYNC)).isEqualTo(true);
+ Assertions.assertThat(writer.hasCapability(
+ StreamCapabilities.HFLUSH)).isEqualTo(true);
+ LongWritable key = new LongWritable();
+ key.set(1);
+ Text value = new Text();
+ value.set("somevalue");
+ writer.append(key, value);
+ writer.flush();
+ writer.hflush();
+ writer.hsync();
+ Assertions.assertThat(fs.getFileStatus(p).getLen()).isGreaterThan(0);
+ }
+ }
+
/** For debugging and testing. */
public static void main(String[] args) throws Exception {
int count = 1024 * 1024;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org