You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/04 00:37:44 UTC

[3/9] hadoop git commit: HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru)

HADOOP-7139. Allow appending to existing SequenceFiles (Contributed by kanaka kumar avvaru)

(cherry picked from commit 295d678be8853a52c3ec3da43d9265478d6632b3)
(cherry picked from commit 80697e4f324948ec32b4cad3faccba55287be652)
(cherry picked from commit b3546b60340e085c5abd8f8f0990d45c7445fe07)

Conflicts:
	hadoop-common-project/hadoop-common/CHANGES.txt

(cherry picked from commit e9c8d8c58516aa64589cd44e9e5dd0a23ba72a17)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4f53c98c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4f53c98c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4f53c98c

Branch: refs/heads/branch-2.6.1
Commit: 4f53c98ca4b6fa4b75935e743df7aae6b54366ce
Parents: 193d8d3
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Jun 18 14:39:00 2015 +0530
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 14:15:23 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../java/org/apache/hadoop/io/SequenceFile.java |  85 ++++-
 .../hadoop/io/TestSequenceFileAppend.java       | 311 +++++++++++++++++++
 3 files changed, 394 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 107a95a..c3d18a1 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -8,6 +8,9 @@ Release 2.6.1 - UNRELEASED
 
   IMPROVEMENTS
 
+    HADOOP-7139. Allow appending to existing SequenceFiles
+    (kanaka kumar avvaru via vinayakumarb)
+
   OPTIMIZATIONS
 
     HADOOP-11238. Update the NameNode's Group Cache in the background when

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
index 35fc130..153856d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
@@ -835,7 +835,9 @@ public class SequenceFile {
     DataOutputStream deflateOut = null;
     Metadata metadata = null;
     Compressor compressor = null;
-    
+
+    private boolean appendMode = false;
+
     protected Serializer keySerializer;
     protected Serializer uncompressedValSerializer;
     protected Serializer compressedValSerializer;
@@ -907,6 +909,13 @@ public class SequenceFile {
       }
     }
 
+    static class AppendIfExistsOption extends Options.BooleanOption implements
+        Option {
+      AppendIfExistsOption(boolean value) {
+        super(value);
+      }
+    }
+
     static class KeyClassOption extends Options.ClassOption implements Option {
       KeyClassOption(Class<?> value) {
         super(value);
@@ -956,7 +965,7 @@ public class SequenceFile {
         return codec;
       }
     }
-    
+
     public static Option file(Path value) {
       return new FileOption(value);
     }
@@ -982,6 +991,10 @@ public class SequenceFile {
       return new ReplicationOption(value);
     }
     
+    public static Option appendIfExists(boolean value) {
+      return new AppendIfExistsOption(value);
+    }
+
     public static Option blockSize(long value) {
       return new BlockSizeOption(value);
     }
@@ -1028,6 +1041,8 @@ public class SequenceFile {
       ProgressableOption progressOption = 
         Options.getOption(ProgressableOption.class, opts);
       FileOption fileOption = Options.getOption(FileOption.class, opts);
+      AppendIfExistsOption appendIfExistsOption = Options.getOption(
+          AppendIfExistsOption.class, opts);
       FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
       StreamOption streamOption = Options.getOption(StreamOption.class, opts);
       KeyClassOption keyClassOption = 
@@ -1069,7 +1084,54 @@ public class SequenceFile {
           blockSizeOption.getValue();
         Progressable progress = progressOption == null ? null :
           progressOption.getValue();
-        out = fs.create(p, true, bufferSize, replication, blockSize, progress);
+
+        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
+            && fs.exists(p)) {
+
+          // Read the file and verify header details
+          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
+          try {
+
+            if (keyClassOption.getValue() != reader.getKeyClass()
+                || valueClassOption.getValue() != reader.getValueClass()) {
+              throw new IllegalArgumentException(
+                  "Key/value class provided does not match the file");
+            }
+
+            if (reader.getVersion() != VERSION[3]) {
+              throw new VersionMismatchException(VERSION[3],
+                  reader.getVersion());
+            }
+
+            if (metadataOption != null) {
+              LOG.info("MetaData Option is ignored during append");
+            }
+            metadataOption = (MetadataOption) SequenceFile.Writer
+                .metadata(reader.getMetadata());
+
+            CompressionOption readerCompressionOption = new CompressionOption(
+                reader.getCompressionType(), reader.getCompressionCodec());
+
+            if (readerCompressionOption.value != compressionTypeOption.value
+                || !readerCompressionOption.codec.getClass().getName()
+                    .equals(compressionTypeOption.codec.getClass().getName())) {
+              throw new IllegalArgumentException(
+                  "Compression option provided does not match the file");
+            }
+
+            sync = reader.getSync();
+
+          } finally {
+            reader.close();
+          }
+
+          out = fs.append(p, bufferSize, progress);
+          this.appendMode = true;
+        } else {
+          out = fs
+              .create(p, true, bufferSize, replication, blockSize, progress);
+        }
       } else {
         out = streamOption.getValue();
       }
@@ -1157,7 +1219,7 @@ public class SequenceFile {
       out.write(sync);                       // write the sync bytes
       out.flush();                           // flush header
     }
-    
+
     /** Initialize. */
     @SuppressWarnings("unchecked")
     void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
@@ -1212,7 +1274,12 @@ public class SequenceFile {
         }
         this.compressedValSerializer.open(deflateOut);
       }
-      writeFileHeader();
+
+      if (appendMode) {
+        sync();
+      } else {
+        writeFileHeader();
+      }
     }
     
     /** Returns the class of keys in this file. */
@@ -2043,6 +2110,14 @@ public class SequenceFile {
     /** Returns the compression codec of data in this file. */
     public CompressionCodec getCompressionCodec() { return codec; }
     
+    private byte[] getSync() {
+      return sync;
+    }
+
+    private byte getVersion() {
+      return version;
+    }
+
     /**
      * Get the compression type for this file.
      * @return the compression type

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f53c98c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
new file mode 100644
index 0000000..4576642
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileAppend.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSequenceFileAppend {
+
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static Path ROOT_PATH = new Path(System.getProperty(
+      "test.build.data", "build/test/data"));
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.JavaSerialization");
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+    fs = FileSystem.get(conf);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    fs.close();
+  }
+
+  @Test(timeout = 30000)
+  public void testAppend() throws Exception {
+
+    Path file = new Path(ROOT_PATH, "testseqappend.seq");
+    fs.delete(file, true);
+
+    Text key1 = new Text("Key1");
+    Text value1 = new Text("Value1");
+    Text value2 = new Text("Updated");
+
+    SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+    metadata.set(key1, value1);
+    Writer.Option metadataOption = Writer.metadata(metadata);
+
+    Writer writer = SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class), metadataOption);
+
+    writer.append(1L, "one");
+    writer.append(2L, "two");
+    writer.close();
+
+    verify2Values(file);
+
+    metadata.set(key1, value2);
+
+    writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class),
+        SequenceFile.Writer.appendIfExists(true), metadataOption);
+
+    // Verify the Meta data is not changed
+    assertEquals(value1, writer.metadata.get(key1));
+
+    writer.append(3L, "three");
+    writer.append(4L, "four");
+
+    writer.close();
+
+    verifyAll4Values(file);
+
+    // Verify the Meta data readable after append
+    Reader reader = new Reader(conf, Reader.file(file));
+    assertEquals(value1, reader.getMetadata().get(key1));
+    reader.close();
+
+    // Verify failure if the compression details are different
+    try {
+      Option wrongCompressOption = Writer.compression(CompressionType.RECORD,
+          new GzipCodec());
+
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    try {
+      Option wrongCompressOption = Writer.compression(CompressionType.BLOCK,
+          new DefaultCodec());
+
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    fs.deleteOnExit(file);
+  }
+
+  @Test(timeout = 30000)
+  public void testAppendRecordCompression() throws Exception {
+
+    Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
+    fs.delete(file, true);
+
+    Option compressOption = Writer.compression(CompressionType.RECORD,
+        new GzipCodec());
+    Writer writer = SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class), compressOption);
+
+    writer.append(1L, "one");
+    writer.append(2L, "two");
+    writer.close();
+
+    verify2Values(file);
+
+    writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class),
+        SequenceFile.Writer.appendIfExists(true), compressOption);
+
+    writer.append(3L, "three");
+    writer.append(4L, "four");
+    writer.close();
+
+    verifyAll4Values(file);
+
+    fs.deleteOnExit(file);
+  }
+
+  @Test(timeout = 30000)
+  public void testAppendBlockCompression() throws Exception {
+
+    Path file = new Path(ROOT_PATH, "testseqappendblockcompr.seq");
+    fs.delete(file, true);
+
+    Option compressOption = Writer.compression(CompressionType.BLOCK,
+        new GzipCodec());
+    Writer writer = SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class), compressOption);
+
+    writer.append(1L, "one");
+    writer.append(2L, "two");
+    writer.close();
+
+    verify2Values(file);
+
+    writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class),
+        SequenceFile.Writer.appendIfExists(true), compressOption);
+
+    writer.append(3L, "three");
+    writer.append(4L, "four");
+    writer.close();
+
+    verifyAll4Values(file);
+
+    // Verify failure if the compression details are different or not Provided
+    try {
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true));
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    // Verify failure if the compression details are different
+    try {
+      Option wrongCompressOption = Writer.compression(CompressionType.RECORD,
+          new GzipCodec());
+
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    try {
+      Option wrongCompressOption = Writer.compression(CompressionType.BLOCK,
+          new DefaultCodec());
+
+      writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+          SequenceFile.Writer.keyClass(Long.class),
+          SequenceFile.Writer.valueClass(String.class),
+          SequenceFile.Writer.appendIfExists(true), wrongCompressOption);
+      writer.close();
+      fail("Expected IllegalArgumentException for compression options");
+    } catch (IllegalArgumentException IAE) {
+      // Expected exception. Ignore it
+    }
+
+    fs.deleteOnExit(file);
+  }
+
+  @Test(timeout = 30000)
+  public void testAppendSort() throws Exception {
+    Path file = new Path(ROOT_PATH, "testseqappendSort.seq");
+    fs.delete(file, true);
+
+    Path sortedFile = new Path(ROOT_PATH, "testseqappendSort.seq.sort");
+    fs.delete(sortedFile, true);
+
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+        new JavaSerializationComparator<Long>(), Long.class, String.class, conf);
+
+    Option compressOption = Writer.compression(CompressionType.BLOCK,
+        new GzipCodec());
+    Writer writer = SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class), compressOption);
+
+    writer.append(2L, "two");
+    writer.append(1L, "one");
+
+    writer.close();
+
+    writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(file),
+        SequenceFile.Writer.keyClass(Long.class),
+        SequenceFile.Writer.valueClass(String.class),
+        SequenceFile.Writer.appendIfExists(true), compressOption);
+
+    writer.append(4L, "four");
+    writer.append(3L, "three");
+    writer.close();
+
+    // Sort file after append
+    sorter.sort(file, sortedFile);
+    verifyAll4Values(sortedFile);
+
+    fs.deleteOnExit(file);
+    fs.deleteOnExit(sortedFile);
+  }
+
+  private void verify2Values(Path file) throws IOException {
+    Reader reader = new Reader(conf, Reader.file(file));
+    assertEquals(1L, reader.next((Object) null));
+    assertEquals("one", reader.getCurrentValue((Object) null));
+    assertEquals(2L, reader.next((Object) null));
+    assertEquals("two", reader.getCurrentValue((Object) null));
+    assertNull(reader.next((Object) null));
+    reader.close();
+  }
+
+  private void verifyAll4Values(Path file) throws IOException {
+    Reader reader = new Reader(conf, Reader.file(file));
+    assertEquals(1L, reader.next((Object) null));
+    assertEquals("one", reader.getCurrentValue((Object) null));
+    assertEquals(2L, reader.next((Object) null));
+    assertEquals("two", reader.getCurrentValue((Object) null));
+    assertEquals(3L, reader.next((Object) null));
+    assertEquals("three", reader.getCurrentValue((Object) null));
+    assertEquals(4L, reader.next((Object) null));
+    assertEquals("four", reader.getCurrentValue((Object) null));
+    assertNull(reader.next((Object) null));
+    reader.close();
+  }
+}