You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2009/03/26 09:23:56 UTC
svn commit: r758551 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/fs/kfs/ src/test/org/apache/hadoop/fs/kfs/
Author: cdouglas
Date: Thu Mar 26 08:23:48 2009
New Revision: 758551
URL: http://svn.apache.org/viewvc?rev=758551&view=rev
Log:
HADOOP-5331. Add support for KFS appends. Contributed by Sriram Rao
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java
hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java
hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java
hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Mar 26 08:23:48 2009
@@ -190,6 +190,8 @@
HADOOP-5423. Include option of preserving file metadata in
SequenceFile::sort. (Michael Tamm via cdouglas)
+ HADOOP-5331. Add support for KFS appends. (Sriram Rao via cdouglas)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java Thu Mar 26 08:23:48 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
interface IFSImpl {
public boolean exists(String path) throws IOException;
@@ -52,7 +53,8 @@
public String[][] getDataLocation(String path, long start, long len) throws IOException;
public long getModificationTime(String path) throws IOException;
- public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException;
+ public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException;
public FSDataInputStream open(String path, int bufferSize) throws IOException;
+ public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException;
};
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java Thu Mar 26 08:23:48 2009
@@ -29,6 +29,7 @@
import org.kosmix.kosmosfs.access.KfsAccess;
import org.kosmix.kosmosfs.access.KfsFileAttr;
+import org.apache.hadoop.util.Progressable;
class KFSImpl implements IFSImpl {
private KfsAccess kfsAccess = null;
@@ -132,13 +133,19 @@
return kfsAccess.kfs_getModificationTime(path);
}
- public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException {
- return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication),
- statistics);
- }
-
public FSDataInputStream open(String path, int bufferSize) throws IOException {
return new FSDataInputStream(new KFSInputStream(kfsAccess, path,
statistics));
}
+
+ public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException {
+ return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication, false, progress),
+ statistics);
+ }
+
+ public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException {
+ // when opening for append, # of replicas is ignored
+ return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, (short) 1, true, progress),
+ statistics);
+ }
}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java Thu Mar 26 08:23:48 2009
@@ -37,11 +37,17 @@
private String path;
private KfsOutputChannel kfsChannel;
+ private Progressable progressReporter;
- public KFSOutputStream(KfsAccess kfsAccess, String path, short replication) {
+ public KFSOutputStream(KfsAccess kfsAccess, String path, short replication,
+ boolean append, Progressable prog) {
this.path = path;
- this.kfsChannel = kfsAccess.kfs_create(path, replication);
+ if ((append) && (kfsAccess.kfs_isFile(path)))
+ this.kfsChannel = kfsAccess.kfs_append(path);
+ else
+ this.kfsChannel = kfsAccess.kfs_create(path, replication);
+ this.progressReporter = prog;
}
public long getPos() throws IOException {
@@ -66,6 +72,8 @@
throw new IOException("File closed");
}
+ // touch the progress before going into KFS since the call can block
+ progressReporter.progress();
kfsChannel.write(ByteBuffer.wrap(b, off, len));
}
@@ -73,6 +81,8 @@
if (kfsChannel == null) {
throw new IOException("File closed");
}
+ // touch the progress before going into KFS since the call can block
+ progressReporter.progress();
kfsChannel.sync();
}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Thu Mar 26 08:23:48 2009
@@ -168,11 +168,18 @@
}
}
- /** This optional operation is not yet supported. */
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
- throw new IOException("Not supported");
+ Path parent = f.getParent();
+ if (parent != null && !mkdirs(parent)) {
+ throw new IOException("Mkdirs failed to create " + parent);
+ }
+
+ Path absolute = makeAbsolute(f);
+ String srep = absolute.toUri().getPath();
+
+ return kfsImpl.append(srep, bufferSize, progress);
}
@Override
@@ -197,7 +204,7 @@
Path absolute = makeAbsolute(file);
String srep = absolute.toUri().getPath();
- return kfsImpl.create(srep, replication, bufferSize);
+ return kfsImpl.create(srep, replication, bufferSize, progress);
}
@Override
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java Thu Mar 26 08:23:48 2009
@@ -30,7 +30,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
+import org.apache.hadoop.util.Progressable;
public class KFSEmulationImpl implements IFSImpl {
FileSystem localFS;
@@ -130,7 +130,13 @@
return s.getModificationTime();
}
- public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException {
+ public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException {
+ // besides path/overwrite, the other args don't matter for
+ // testing purposes.
+ return localFS.append(new Path(path));
+ }
+
+ public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException {
// besides path/overwrite, the other args don't matter for
// testing purposes.
return localFS.create(new Path(path));
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java?rev=758551&r1=758550&r2=758551&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java Thu Mar 26 08:23:48 2009
@@ -147,6 +147,7 @@
// Read the stuff back and verify it is correct
FSDataInputStream s2 = kosmosFileSystem.open(file1, 4096);
int v;
+ long nread = 0;
v = s2.read();
assertEquals(v, 32);
@@ -161,6 +162,8 @@
byte[] buf = new byte[bufsz];
s2.read(buf, 0, buf.length);
+ nread = s2.getPos();
+
for (int i = 0; i < data.length; i++)
assertEquals(data[i], buf[i]);
@@ -168,6 +171,28 @@
s2.close();
+ // append some data to the file
+ try {
+ s1 = kosmosFileSystem.append(file1);
+ for (int i = 0; i < data.length; i++)
+ data[i] = (byte) (i % 17);
+ // write the data
+ s1.write(data, 0, data.length);
+ // flush out the changes
+ s1.close();
+
+ // read it back and validate
+ s2 = kosmosFileSystem.open(file1, 4096);
+ s2.seek(nread);
+ s2.read(buf, 0, buf.length);
+ for (int i = 0; i < data.length; i++)
+ assertEquals(data[i], buf[i]);
+
+ s2.close();
+ } catch (Exception e) {
+ System.out.println("append isn't supported by the underlying fs");
+ }
+
kosmosFileSystem.delete(file1, true);
assertFalse(kosmosFileSystem.exists(file1));
kosmosFileSystem.delete(subDir1, true);