You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/04 00:37:44 UTC
[3/9] hadoop git commit: HADOOP-7139. Allow appending to existing
SequenceFiles (Contributed by kanaka kumar avvaru)
HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru)
(cherry picked from commit 295d678be8853a52c3ec3da43d9265478d6632b3)
(cherry picked from commit 80697e4f324948ec32b4cad3faccba55287be652)
(cherry picked from commit b3546b60340e085c5abd8f8f0990d45c7445fe07)
Conflicts:
hadoop-common-project/hadoop-common/CHANGES.txt
(cherry picked from commit e9c8d8c58516aa64589cd44e9e5dd0a23ba72a17)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4f53c98c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4f53c98c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4f53c98c
Branch: refs/heads/branch-2.6.1
Commit: 4f53c98ca4b6fa4b75935e743df7aae6b54366ce
Parents: 193d8d3
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Jun 18 14:39:00 2015 +0530
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:15:23 2015 -0700
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 3 +
.../java/org/apache/hadoop/io/SequenceFile.java | 85 ++++-
.../hadoop/io/TestSequenceFileAppend.java | 311 +++++++++++++++++++
3 files changed, 394 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 107a95a..c3d18a1 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -8,6 +8,9 @@ Release 2.6.1 - UNRELEASED
IMPROVEMENTS
+ HADOOP-7139. Allow appending to existing SequenceFiles
+ (kanaka kumar avvaru via vinayakumarb)
+
OPTIMIZATIONS
HADOOP-11238. Update the NameNode's Group Cache in the background when
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 35fc130..153856d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -835,7 +835,9 @@ public class SequenceFile {
DataOutputStream deflateOut = null;
Metadata metadata = null;
Compressor compressor = null;
-
+
+ private boolean appendMode = false;
+
protected Serializer keySerializer;
protected Serializer uncompressedValSerializer;
protected Serializer compressedValSerializer;
@@ -907,6 +909,13 @@ public class SequenceFile {
}
}
+ static class AppendIfExistsOption extends Options.BooleanOption implements
+ Option {
+ AppendIfExistsOption(boolean value) {
+ super(value);
+ }
+ }
+
static class KeyClassOption extends Options.ClassOption implements Option {
KeyClassOption(Class<?> value) {
super(value);
@@ -956,7 +965,7 @@ public class SequenceFile {
return codec;
}
}
-
+
public static Option file(Path value) {
return new FileOption(value);
}
@@ -982,6 +991,10 @@ public class SequenceFile {
return new ReplicationOption(value);
}
+ public static Option appendIfExists(boolean value) {
+ return new AppendIfExistsOption(value);
+ }
+
public static Option blockSize(long value) {
return new BlockSizeOption(value);
}
@@ -1028,6 +1041,8 @@ public class SequenceFile {
ProgressableOption progressOption =
Options.getOption(ProgressableOption.class, opts);
FileOption fileOption = Options.getOption(FileOption.class, opts);
+ AppendIfExistsOption appendIfExistsOption = Options.getOption(
+ AppendIfExistsOption.class, opts);
FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
StreamOption streamOption = Options.getOption(StreamOption.class, opts);
KeyClassOption keyClassOption =
@@ -1069,7 +1084,54 @@ public class SequenceFile {
blockSizeOption.getValue();
Progressable progress = progressOption == null ? null :
progressOption.getValue();
- out = fs.create(p, true, bufferSize, replication, blockSize, progress);
+
+ if (appendIfExistsOption != null && appendIfExistsOption.getValue()
+ && fs.exists(p)) {
+
+ // Read the file and verify header details
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
+ try {
+
+ if (keyClassOption.getValue() != reader.getKeyClass()
+ || valueClassOption.getValue() != reader.getValueClass()) {
+ throw new IllegalArgumentException(
+ "Key/value class provided does not match the file");
+ }
+
+ if (reader.getVersion() != VERSION[3]) {
+ throw new VersionMismatchException(VERSION[3],
+ reader.getVersion());
+ }
+
+ if (metadataOption != null) {
+ LOG.info("MetaData Option is ignored during append");
+ }
+ metadataOption = (MetadataOption) SequenceFile.Writer
+ .metadata(reader.getMetadata());
+
+ CompressionOption readerCompressionOption = new CompressionOption(
+ reader.getCompressionType(), reader.getCompressionCodec());
+
+ if (readerCompressionOption.value != compressionTypeOption.value
+ || !readerCompressionOption.codec.getClass().getName()
+ .equals(compressionTypeOption.codec.getClass().getName())) {
+ throw new IllegalArgumentException(
+ "Compression option provided does not match the file");
+ }
+
+ sync = reader.getSync();
+
+ } finally {
+ reader.close();
+ }
+
+ out = fs.append(p, bufferSize, progress);
+ this.appendMode = true;
+ } else {
+ out = fs
+ .create(p, true, bufferSize, replication, blockSize, progress);
+ }
} else {
out = streamOption.getValue();
}
@@ -1157,7 +1219,7 @@ public class SequenceFile {
out.write(sync); // write the sync bytes
out.flush(); // flush header
}
-
+
/** Initialize. */
@SuppressWarnings("unchecked")
void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
@@ -1212,7 +1274,12 @@ public class SequenceFile {
}
this.compressedValSerializer.open(deflateOut);
}
- writeFileHeader();
+
+ if (appendMode) {
+ sync();
+ } else {
+ writeFileHeader();
+ }
}
/** Returns the class of keys in this file. */
@@ -2043,6 +2110,14 @@ public class SequenceFile {
/** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() { return codec; }
+ private byte[] getSync() {
+ return sync;
+ }
+
+ private byte getVersion() {
+ return version;
+ }
+
/**
* Get the compression type for this file.
* @return the compression type
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
new file mode 100644
index 0000000..4576642
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSequenceFileAppend {
+
+ private static Configuration conf;
+ private static FileSystem fs;
+ private static Path ROOT_PATH = new Path(System.getProperty(
+ "test.build.data", "build/test/data"));
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ conf = new Configuration();
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization");
+ conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+ fs = FileSystem.get(conf);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ fs.close();
+ }
+
+ @Test(timeout = 30000)
+ public void testAppend() throws Exception {
+
+ Path file = new Path(ROOT_PATH, "testseqappend.seq");
+ fs.delete(file, true);
+
+ Text key1 = new Text("Key1");
+ Text value1 = new Text("Value1");
+ Text value2 = new Text("Updated");
+
+ SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+ metadata.set(key1, value1);
+ Writer.Option metadataOption = Writer.metadata(metadata);
+
+ Writer writer = SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class), metadataOption);
+
+ writer.append(1L, "one");
+ writer.append(2L, "two");
+ writer.close();
+
+ verify2Values(file);
+
+ metadata.set(key1, value2);
+
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true), metadataOption);
+
+ // Verify the Meta data is not changed
+ assertEquals(value1, writer.metadata.get(key1));
+
+ writer.append(3L, "three");
+ writer.append(4L, "four");
+
+ writer.close();
+
+ verifyAll4Values(file);
+
+ // Verify the Meta data readable after append
+ Reader reader = new Reader(conf, Reader.file(file));
+ assertEquals(value1, reader.getMetadata().get(key1));
+ reader.close();
+
+ // Verify failure if the compression details are different
+ try {
+ Option wrongCompressOption = Writer.compression(CompressionType.RECORD,
+ new GzipCodec());
+
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+ writer.close();
+ fail("Expected IllegalArgumentException for compression options");
+ } catch (IllegalArgumentException IAE) {
+ // Expected exception. Ignore it
+ }
+
+ try {
+ Option wrongCompressOption = Writer.compression(CompressionType.BLOCK,
+ new DefaultCodec());
+
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+ writer.close();
+ fail("Expected IllegalArgumentException for compression options");
+ } catch (IllegalArgumentException IAE) {
+ // Expected exception. Ignore it
+ }
+
+ fs.deleteOnExit(file);
+ }
+
+ @Test(timeout = 30000)
+ public void testAppendRecordCompression() throws Exception {
+
+ Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
+ fs.delete(file, true);
+
+ Option compressOption = Writer.compression(CompressionType.RECORD,
+ new GzipCodec());
+ Writer writer = SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class), compressOption);
+
+ writer.append(1L, "one");
+ writer.append(2L, "two");
+ writer.close();
+
+ verify2Values(file);
+
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true), compressOption);
+
+ writer.append(3L, "three");
+ writer.append(4L, "four");
+ writer.close();
+
+ verifyAll4Values(file);
+
+ fs.deleteOnExit(file);
+ }
+
+ @Test(timeout = 30000)
+ public void testAppendBlockCompression() throws Exception {
+
+ Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
+ fs.delete(file, true);
+
+ Option compressOption = Writer.compression(CompressionType.BLOCK,
+ new GzipCodec());
+ Writer writer = SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class), compressOption);
+
+ writer.append(1L, "one");
+ writer.append(2L, "two");
+ writer.close();
+
+ verify2Values(file);
+
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true), compressOption);
+
+ writer.append(3L, "three");
+ writer.append(4L, "four");
+ writer.close();
+
+ verifyAll4Values(file);
+
+ // Verify failure if the compression details are different or not Provided
+ try {
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true));
+ writer.close();
+ fail("Expected IllegalArgumentException for compression options");
+ } catch (IllegalArgumentException IAE) {
+ // Expected exception. Ignore it
+ }
+
+ // Verify failure if the compression details are different
+ try {
+ Option wrongCompressOption = Writer.compression(CompressionType.RECORD,
+ new GzipCodec());
+
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+ writer.close();
+ fail("Expected IllegalArgumentException for compression options");
+ } catch (IllegalArgumentException IAE) {
+ // Expected exception. Ignore it
+ }
+
+ try {
+ Option wrongCompressOption = Writer.compression(CompressionType.BLOCK,
+ new DefaultCodec());
+
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+ writer.close();
+ fail("Expected IllegalArgumentException for compression options");
+ } catch (IllegalArgumentException IAE) {
+ // Expected exception. Ignore it
+ }
+
+ fs.deleteOnExit(file);
+ }
+
+ @Test(timeout = 30000)
+ public void testAppendSort() throws Exception {
+ Path file = new Path(ROOT_PATH, "testseqappendSort.seq");
+ fs.delete(file, true);
+
+ Path sortedFile = new Path(ROOT_PATH, "testseqappendSort.seq.sort");
+ fs.delete(sortedFile, true);
+
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+ new JavaSerializationComparator<Long>(), Long.class, String.class, conf);
+
+ Option compressOption = Writer.compression(CompressionType.BLOCK,
+ new GzipCodec());
+ Writer writer = SequenceFile.createWriter(conf,
+ SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class), compressOption);
+
+ writer.append(2L, "two");
+ writer.append(1L, "one");
+
+ writer.close();
+
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+ SequenceFile.Writer.keyClass(Long.class),
+ SequenceFile.Writer.valueClass(String.class),
+ SequenceFile.Writer.appendIfExists(true), compressOption);
+
+ writer.append(4L, "four");
+ writer.append(3L, "three");
+ writer.close();
+
+ // Sort file after append
+ sorter.sort(file, sortedFile);
+ verifyAll4Values(sortedFile);
+
+ fs.deleteOnExit(file);
+ fs.deleteOnExit(sortedFile);
+ }
+
+ private void verify2Values(Path file) throws IOException {
+ Reader reader = new Reader(conf, Reader.file(file));
+ assertEquals(1L, reader.next((Object) null));
+ assertEquals("one", reader.getCurrentValue((Object) null));
+ assertEquals(2L, reader.next((Object) null));
+ assertEquals("two", reader.getCurrentValue((Object) null));
+ assertNull(reader.next((Object) null));
+ reader.close();
+ }
+
+ private void verifyAll4Values(Path file) throws IOException {
+ Reader reader = new Reader(conf, Reader.file(file));
+ assertEquals(1L, reader.next((Object) null));
+ assertEquals("one", reader.getCurrentValue((Object) null));
+ assertEquals(2L, reader.next((Object) null));
+ assertEquals("two", reader.getCurrentValue((Object) null));
+ assertEquals(3L, reader.next((Object) null));
+ assertEquals("three", reader.getCurrentValue((Object) null));
+ assertEquals(4L, reader.next((Object) null));
+ assertEquals("four", reader.getCurrentValue((Object) null));
+ assertNull(reader.next((Object) null));
+ reader.close();
+ }
+}