You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/03/26 09:23:56 UTC

svn commit: r758551 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/fs/kfs/ src/test/org/apache/hadoop/fs/kfs/

Author: cdouglas
Date: Thu Mar 26 08:23:48 2009
New Revision: 758551

URL: http://svn.apache.org/viewvc?rev=758551&view=rev
Log:
HADOOP-5331. Add support for KFS appends. Contributed by Sriram Rao

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 26 08:23:48 2009
@@ -190,6 +190,8 @@
     HADOOP-5423. Include option of preserving file metadata in
     SequenceFile::sort. (Michael Tamm via cdouglas)
 
+    HADOOP-5331. Add support for KFS appends. (Sriram Rao via cdouglas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java Thu Mar 26 08:23:48 2009
@@ -33,6 +33,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
 
 interface IFSImpl {
     public boolean exists(String path) throws IOException;
@@ -52,7 +53,8 @@
     public String[][] getDataLocation(String path, long start, long len) throws IOException;
 
     public long getModificationTime(String path) throws IOException;
-    public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException;
+    public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException;
     public FSDataInputStream open(String path, int bufferSize) throws IOException;
+    public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException;
     
 };

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java Thu Mar 26 08:23:48 2009
@@ -29,6 +29,7 @@
 
 import org.kosmix.kosmosfs.access.KfsAccess;
 import org.kosmix.kosmosfs.access.KfsFileAttr;
+import org.apache.hadoop.util.Progressable;
 
 class KFSImpl implements IFSImpl {
     private KfsAccess kfsAccess = null;
@@ -132,13 +133,19 @@
         return kfsAccess.kfs_getModificationTime(path);
     }
 
-    public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException {
-        return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication), 
-                                      statistics);
-    }
-
     public FSDataInputStream open(String path, int bufferSize) throws IOException {
         return new FSDataInputStream(new KFSInputStream(kfsAccess, path, 
                                                         statistics));
     }
+
+    public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException {
+        return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication, false, progress), 
+                                      statistics);
+    }
+
+    public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException {
+        // when opening for append, # of replicas is ignored
+        return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, (short) 1, true, progress), 
+                                      statistics);
+    }
 }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java Thu Mar 26 08:23:48 2009
@@ -37,11 +37,17 @@
 
     private String path;
     private KfsOutputChannel kfsChannel;
+    private Progressable progressReporter;
 
-    public KFSOutputStream(KfsAccess kfsAccess, String path, short replication) {
+    public KFSOutputStream(KfsAccess kfsAccess, String path, short replication,
+                           boolean append, Progressable prog) {
         this.path = path;
 
-        this.kfsChannel = kfsAccess.kfs_create(path, replication);
+        if ((append) && (kfsAccess.kfs_isFile(path)))
+                this.kfsChannel = kfsAccess.kfs_append(path);
+        else
+                this.kfsChannel = kfsAccess.kfs_create(path, replication);
+        this.progressReporter = prog;
     }
 
     public long getPos() throws IOException {
@@ -66,6 +72,8 @@
             throw new IOException("File closed");
         }
 
+        // touch the progress before going into KFS since the call can block
+        progressReporter.progress();
         kfsChannel.write(ByteBuffer.wrap(b, off, len));
     }
 
@@ -73,6 +81,8 @@
         if (kfsChannel == null) {
             throw new IOException("File closed");
         }
+        // touch the progress before going into KFS since the call can block
+        progressReporter.progress();
         kfsChannel.sync();
     }
 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Thu Mar 26 08:23:48 2009
@@ -168,11 +168,18 @@
         }
     }
     
-    /** This optional operation is not yet supported. */
     @Override
     public FSDataOutputStream append(Path f, int bufferSize,
         Progressable progress) throws IOException {
-      throw new IOException("Not supported");
+        Path parent = f.getParent();
+        if (parent != null && !mkdirs(parent)) {
+            throw new IOException("Mkdirs failed to create " + parent);
+        }
+
+        Path absolute = makeAbsolute(f);
+        String srep = absolute.toUri().getPath();
+
+        return kfsImpl.append(srep, bufferSize, progress);
     }
 
     @Override
@@ -197,7 +204,7 @@
         Path absolute = makeAbsolute(file);
         String srep = absolute.toUri().getPath();
 
-        return kfsImpl.create(srep, replication, bufferSize);
+        return kfsImpl.create(srep, replication, bufferSize, progress);
     }
 
     @Override

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java Thu Mar 26 08:23:48 2009
@@ -30,7 +30,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
+import org.apache.hadoop.util.Progressable;
 
 public class KFSEmulationImpl implements IFSImpl {
     FileSystem localFS;
@@ -130,7 +130,13 @@
         return s.getModificationTime();
     }
 
-    public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException {
+    public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException {
+        // besides path/overwrite, the other args don't matter for
+        // testing purposes.
+        return localFS.append(new Path(path));
+    }
+
+    public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException {
         // besides path/overwrite, the other args don't matter for
         // testing purposes.
         return localFS.create(new Path(path));

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java Thu Mar 26 08:23:48 2009
@@ -147,6 +147,7 @@
         // Read the stuff back and verify it is correct
         FSDataInputStream s2 = kosmosFileSystem.open(file1, 4096);
         int v;
+        long nread = 0;
 
         v = s2.read();
         assertEquals(v, 32);
@@ -161,6 +162,8 @@
 
         byte[] buf = new byte[bufsz];
         s2.read(buf, 0, buf.length);
+        nread = s2.getPos();
+
         for (int i = 0; i < data.length; i++)
             assertEquals(data[i], buf[i]);
 
@@ -168,6 +171,28 @@
 
         s2.close();
 
+        // append some data to the file
+        try {
+            s1 = kosmosFileSystem.append(file1);
+            for (int i = 0; i < data.length; i++)
+                data[i] = (byte) (i % 17);
+            // write the data
+            s1.write(data, 0, data.length);
+            // flush out the changes
+            s1.close();
+
+            // read it back and validate
+            s2 = kosmosFileSystem.open(file1, 4096);
+            s2.seek(nread);
+            s2.read(buf, 0, buf.length);
+            for (int i = 0; i < data.length; i++)
+                assertEquals(data[i], buf[i]);
+
+            s2.close();
+        } catch (Exception e) {
+            System.out.println("append isn't supported by the underlying fs");
+        }
+
         kosmosFileSystem.delete(file1, true);
         assertFalse(kosmosFileSystem.exists(file1));        
         kosmosFileSystem.delete(subDir1, true);