You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/10/03 23:17:04 UTC

svn commit: r581725 - in /lucene/hadoop/trunk: ./ conf/ lib/ src/java/org/apache/hadoop/fs/kfs/ src/test/org/apache/hadoop/fs/kfs/

Author: cutting
Date: Wed Oct  3 14:17:00 2007
New Revision: 581725

URL: http://svn.apache.org/viewvc?rev=581725&view=rev
Log:
HADOOP-1963.  Add a FileSystem implementation for the Kosmos Filesystem (KFS).  Contributed by Sriram Rao.

Added:
    lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt
    lucene/hadoop/trunk/lib/kfs-0.1.jar   (with props)
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581725&r1=581724&r2=581725&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct  3 14:17:00 2007
@@ -74,6 +74,8 @@
     and codec, independent of the final output's compression
     parameters.  (Arun C Murthy via cutting)
 
+    HADOOP-1963.  Add a FileSystem implementation for the Kosmos
+    Filesystem (KFS).  (Sriram Rao via cutting)
 
   OPTIMIZATIONS
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=581725&r1=581724&r2=581725&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Oct  3 14:17:00 2007
@@ -145,6 +145,12 @@
 </property>
 
 <property>
+  <name>fs.kfs.impl</name>
+  <value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value>
+  <description>The FileSystem for kfs: uris.</description>
+</property>
+
+<property>
   <name>fs.hftp.impl</name>
   <value>org.apache.hadoop.dfs.HftpFileSystem</value>
 </property>

Added: lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt (added)
+++ lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt Wed Oct  3 14:17:00 2007
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

Added: lucene/hadoop/trunk/lib/kfs-0.1.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/kfs-0.1.jar?rev=581725&view=auto
==============================================================================
Binary file - no diff available.

Propchange: lucene/hadoop/trunk/lib/kfs-0.1.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java Wed Oct  3 14:17:00 2007
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ * 
+ * We need to provide the ability to the code in fs/kfs without really
+ * having a KFS deployment.  In particular, the glue code that wraps
+ * around calls to KfsAccess object.  This is accomplished by defining a
+ * filesystem implementation interface:  
+ *   -- for testing purposes, a dummy implementation of this interface
+ * will suffice; as long as the dummy implementation is close enough
+ * to doing what KFS does, we are good.
+ *   -- for deployment purposes with KFS, this interface is
+ * implemented by the KfsImpl object.
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+interface IFSImpl {
+    public boolean exists(String path) throws IOException;
+    public boolean isDirectory(String path) throws IOException;
+    public boolean isFile(String path) throws IOException;
+    public String[] readdir(String path) throws IOException;
+
+    public int mkdirs(String path) throws IOException;
+    public int rename(String source, String dest) throws IOException;
+
+    public int rmdir(String path) throws IOException; 
+    public int remove(String path) throws IOException;
+    public long filesize(String path) throws IOException;
+    public short getReplication(String path) throws IOException;
+    public short setReplication(String path, short replication) throws IOException;
+    public String[][] getDataLocation(String path, long start, long len) throws IOException;
+
+    public long getModificationTime(String path) throws IOException;
+    public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException;
+    public FSDataInputStream open(String path, int bufferSize) throws IOException;
+    
+};

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java Wed Oct  3 14:17:00 2007
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ * 
+ * Provide the implementation of KFS which turn into calls to KfsAccess.
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+import org.kosmix.kosmosfs.access.KfsAccess;
+
+class KFSImpl implements IFSImpl {
+    private KfsAccess kfsAccess = null;
+
+    public KFSImpl(String metaServerHost, int metaServerPort) throws IOException {
+        kfsAccess = new KfsAccess(metaServerHost, metaServerPort);
+    }
+
+    public boolean exists(String path) throws IOException {
+        return kfsAccess.kfs_exists(path);
+    }
+
+    public boolean isDirectory(String path) throws IOException {
+        return kfsAccess.kfs_isDirectory(path);
+    }
+
+    public boolean isFile(String path) throws IOException {
+        return kfsAccess.kfs_isFile(path);
+    }
+
+    public String[] readdir(String path) throws IOException {
+        return kfsAccess.kfs_readdir(path);
+    }
+
+    public int mkdirs(String path) throws IOException {
+        return kfsAccess.kfs_mkdirs(path);
+    }
+
+    public int rename(String source, String dest) throws IOException {
+        return kfsAccess.kfs_rename(source, dest);
+    }
+
+    public int rmdir(String path) throws IOException {
+        return kfsAccess.kfs_rmdir(path);
+    }
+
+    public int remove(String path) throws IOException {
+        return kfsAccess.kfs_remove(path);
+    }
+
+    public long filesize(String path) throws IOException {
+        return kfsAccess.kfs_filesize(path);
+    }
+
+    public short getReplication(String path) throws IOException {
+        return kfsAccess.kfs_getReplication(path);
+    }
+
+    public short setReplication(String path, short replication) throws IOException {
+        return kfsAccess.kfs_setReplication(path, replication);
+    }
+
+    public String[][] getDataLocation(String path, long start, long len) throws IOException {
+        return kfsAccess.kfs_getDataLocation(path, start, len);
+    }
+
+    public long getModificationTime(String path) throws IOException {
+        // Supporting this API requires changes to the Java-side of
+        // the KFS client API.  For now, return 0; in the next rev of
+        // KFS, we'll update the Java API.
+        return 0;
+    }
+
+    public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException {
+        return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication));
+    }
+
+    public FSDataInputStream open(String path, int bufferSize) throws IOException {
+        return new FSDataInputStream(new KFSInputStream(kfsAccess, path));
+    }
+};

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java Wed Oct  3 14:17:00 2007
@@ -0,0 +1,123 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ * 
+ * Implements the Hadoop FSInputStream interfaces to allow applications to read
+ * files in Kosmos File System (KFS).
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.util.Progressable;
+
+import org.kosmix.kosmosfs.access.KfsAccess;
+import org.kosmix.kosmosfs.access.KfsInputChannel;
+
+class KFSInputStream extends FSInputStream {
+
+    private String path;
+    private KfsInputChannel kfsChannel;
+
+    private long fsize;
+
+    public KFSInputStream(KfsAccess kfsAccess, String path) {
+        this.path = path;
+
+        this.kfsChannel = kfsAccess.kfs_open(path);
+        if (this.kfsChannel != null)
+            this.fsize = kfsAccess.kfs_filesize(path);
+        else
+            this.fsize = 0;
+    }
+
+    public long getPos() throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+        return kfsChannel.tell();
+    }
+
+    public synchronized int available() throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+        return (int) (this.fsize - getPos());
+    }
+
+    public synchronized void seek(long targetPos) throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+        kfsChannel.seek(targetPos);
+    }
+
+    public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+        return false;
+    }
+
+    public synchronized int read() throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+        byte b[] = new byte[4];
+        int res = read(b, 0, 4);
+        if (res == 4)
+            return (b[0] + (b[1] << 8) + (b[2] << 16) + (b[3] << 24));
+        return -1;
+    }
+
+    public synchronized int read(byte b[], int off, int len) throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+	int res;
+
+	res = kfsChannel.read(ByteBuffer.wrap(b, off, len));
+	// Use -1 to signify EOF
+	if (res == 0)
+	    return -1;
+	return res;
+    }
+
+    public synchronized void close() throws IOException {
+        if (kfsChannel == null) {
+            return;
+        }
+
+        kfsChannel.close();
+        kfsChannel = null;
+    }
+
+    public boolean markSupported() {
+        return false;
+    }
+
+    public void mark(int readLimit) {
+        // Do nothing
+    }
+
+    public void reset() throws IOException {
+        throw new IOException("Mark not supported");
+    }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java Wed Oct  3 14:17:00 2007
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ * 
+ * Implements the Hadoop FSOutputStream interfaces to allow applications to write to
+ * files in Kosmos File System (KFS).
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.util.Progressable;
+
+import org.kosmix.kosmosfs.access.KfsAccess;
+import org.kosmix.kosmosfs.access.KfsOutputChannel;
+
+class KFSOutputStream extends OutputStream {
+
+    private String path;
+    private KfsOutputChannel kfsChannel;
+
+    public KFSOutputStream(KfsAccess kfsAccess, String path, short replication) {
+        this.path = path;
+
+        this.kfsChannel = kfsAccess.kfs_create(path, replication);
+    }
+
+    public long getPos() throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+        return kfsChannel.tell();
+    }
+
+    public void write(int v) throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+        byte[] b = new byte[4];
+
+        b[0] = (byte) (v & 0xFF);
+        b[1] = (byte) ((v >> 8) & 0xFF);
+        b[1] = (byte) ((v >> 16) & 0xFF);
+        b[1] = (byte) ((v >> 24) & 0xFF);
+        write(b, 0, 4);
+    }
+
+    public void write(byte b[], int off, int len) throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+
+        kfsChannel.write(ByteBuffer.wrap(b, off, len));
+    }
+
+    public void flush() throws IOException {
+        if (kfsChannel == null) {
+            throw new IOException("File closed");
+        }
+        kfsChannel.sync();
+    }
+
+    public synchronized void close() throws IOException {
+        if (kfsChannel == null) {
+            return;
+        }
+        flush();
+        kfsChannel.close();
+        kfsChannel = null;
+    }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Wed Oct  3 14:17:00 2007
@@ -0,0 +1,383 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ * 
+ * Implements the Hadoop FS interfaces to allow applications to store
+ *files in Kosmos File System (KFS).
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A FileSystem backed by KFS.
+ *
+ */
+
+public class KosmosFileSystem extends FileSystem {
+
+    private FileSystem localFs;
+    private IFSImpl kfsImpl = null;
+    private URI uri;
+    private Path workingDir = new Path("/");
+
+    public KosmosFileSystem() {
+
+    }
+
+    KosmosFileSystem(IFSImpl fsimpl) {
+        this.kfsImpl = fsimpl;
+    }
+
+    public URI getUri() {
+	return uri;
+    }
+
+    public void initialize(URI uri, Configuration conf) throws IOException {
+        
+        try {
+            if (kfsImpl == null) {
+                kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
+                                      conf.getInt("fs.kfs.metaServerPort", -1));
+            }
+            this.localFs = FileSystem.getLocal(conf);
+            this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println("Unable to initialize KFS");
+            System.exit(-1);
+        }
+    }
+
+    @Deprecated
+    public String getName() {
+	return getUri().toString();
+    }
+
+    public Path getWorkingDirectory() {
+	return workingDir;
+    }
+
+    public void setWorkingDirectory(Path dir) {
+	workingDir = makeAbsolute(dir);
+    }
+
+    private Path makeAbsolute(Path path) {
+	if (path.isAbsolute()) {
+	    return path;
+	}
+	return new Path(workingDir, path);
+    }
+
+    public boolean exists(Path path) throws IOException {
+	// stat the path to make sure it exists
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+        return kfsImpl.exists(srep);
+    }
+
+    public boolean mkdirs(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+
+	int res;
+
+	// System.out.println("Calling mkdirs on: " + srep);
+
+	res = kfsImpl.mkdirs(srep);
+	
+	return res == 0;
+    }
+
+    @Deprecated
+    public boolean isDirectory(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+
+	// System.out.println("Calling isdir on: " + srep);
+
+        return kfsImpl.isDirectory(srep);
+    }
+
+    @Deprecated
+    public boolean isFile(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+        return kfsImpl.isFile(srep);
+    }
+
+    public long getContentLength(Path path)  throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+
+	if (kfsImpl.isFile(srep))
+	    return kfsImpl.filesize(srep);
+        
+	String[] entries = kfsImpl.readdir(srep);
+
+        if (entries == null)
+            return 0;
+
+        // kfsreaddir() returns "." and ".."; strip them before
+        // passing back to hadoop fs.
+	long numEntries = 0;
+	for (int i = 0; i < entries.length; i++) {
+	    if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+		continue;
+	    numEntries++;
+	}
+        return numEntries;
+    }
+
+    public FileStatus[] listStatus(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+
+	if (kfsImpl.isFile(srep))
+	    return new FileStatus[] { getFileStatus(path) } ;
+
+	String[] entries = kfsImpl.readdir(srep);
+
+        if (entries == null)
+            return null;
+
+        // kfsreaddir() returns "." and ".."; strip them before
+        // passing back to hadoop fs.
+	int numEntries = 0;
+	for (int i = 0; i < entries.length; i++) {
+	    if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+		continue;
+	    numEntries++;
+	}
+	if (numEntries == 0) {
+	    return null;
+        }
+
+        // System.out.println("Calling listStatus on: " + path);
+
+	FileStatus[] pathEntries = new FileStatus[numEntries];
+	int j = 0;
+	for (int i = 0; i < entries.length; i++) {
+	    if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+		continue;
+
+	    pathEntries[j] = getFileStatus(new Path(path, entries[i]));
+	    j++;
+	}
+	return pathEntries;
+    }
+
+    public FileStatus getFileStatus(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+        
+        if (kfsImpl.isDirectory(srep)) {
+            // System.out.println("Status of path: " + path + " is dir");
+            return new FileStatus(0, true, 1, 0, 0, path);
+        } else {
+            // System.out.println("Status of path: " + path + " is file");
+            return new FileStatus(kfsImpl.filesize(srep), false, 
+                                  kfsImpl.getReplication(srep),
+                                  getDefaultBlockSize(),
+                                  kfsImpl.getModificationTime(srep), path);
+        }
+    }
+
+    
+    public Path[] listPaths(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+
+	if (kfsImpl.isFile(srep))
+	    return new Path[] { path } ;
+
+	String[] entries = kfsImpl.readdir(srep);
+
+        if (entries == null)
+            return null;
+
+        // kfsreaddir() returns "." and ".."; strip them before
+        // passing back to hadoop fs.
+	int numEntries = 0;
+	for (int i = 0; i < entries.length; i++) {
+	    if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+		continue;
+	    numEntries++;
+	}
+	if (numEntries == 0) {
+	    return null;
+        }
+	Path[] pathEntries = new Path[numEntries];
+	int j = 0;
+	for (int i = 0; i < entries.length; i++) {
+	    if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+		continue;
+
+	    pathEntries[j] = new Path(path, entries[i]);
+	    j++;
+	}
+	return pathEntries;
+
+    }
+
+    public FSDataOutputStream create(Path file, boolean overwrite, int bufferSize,
+				     short replication, long blockSize, Progressable progress)
+	throws IOException {
+
+        if (exists(file)) {
+            if (overwrite) {
+                delete(file);
+            } else {
+                throw new IOException("File already exists: " + file);
+            }
+        }
+
+	Path parent = file.getParent();
+	if (parent != null && !mkdirs(parent)) {
+	    throw new IOException("Mkdirs failed to create " + parent);
+	}
+
+        Path absolute = makeAbsolute(file);
+        String srep = absolute.toUri().getPath();
+
+        return kfsImpl.create(srep, replication, bufferSize);
+    }
+
+    public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+        if (!exists(path))
+            throw new IOException("File does not exist: " + path);
+
+        Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+
+        return kfsImpl.open(srep, bufferSize);
+    }
+
+    public boolean rename(Path src, Path dst) throws IOException {
+	Path absoluteS = makeAbsolute(src);
+        String srepS = absoluteS.toUri().getPath();
+	Path absoluteD = makeAbsolute(dst);
+        String srepD = absoluteD.toUri().getPath();
+
+        // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
+
+        return kfsImpl.rename(srepS, srepD) == 0;
+    }
+
+    // recursively delete the directory and its contents
+    public boolean delete(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+
+        if (kfsImpl.isFile(srep))
+            return kfsImpl.remove(srep) == 0;
+
+        Path[] dirEntries = listPaths(absolute);
+        if (dirEntries != null) {
+            for (int i = 0; i < dirEntries.length; i++) {
+                delete(new Path(absolute, dirEntries[i]));
+            }
+        }
+        return kfsImpl.rmdir(srep) == 0;
+    }
+
+    @Deprecated
+    public long getLength(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+        return kfsImpl.filesize(srep);
+    }
+
+    @Deprecated
+    public short getReplication(Path path) throws IOException {
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+        return kfsImpl.getReplication(srep);
+    }
+
+    public short getDefaultReplication() {
+	return 3;
+    }
+
+    public boolean setReplication(Path path, short replication)
+	throws IOException {
+
+	Path absolute = makeAbsolute(path);
+        String srep = absolute.toUri().getPath();
+
+        int res = kfsImpl.setReplication(srep, replication);
+        return res >= 0;
+    }
+
+    // 64MB is the KFS block size
+
+    public long getDefaultBlockSize() {
+	return 1 << 26;
+    }
+
+    @Deprecated            
+    public void lock(Path path, boolean shared) throws IOException {
+
+    }
+
+    @Deprecated            
+    public void release(Path path) throws IOException {
+
+    }
+
+    /**
+     * Return null if the file doesn't exist; otherwise, get the
+     * locations of the various chunks of the file file from KFS.
+     */
+    public String[][] getFileCacheHints(Path f, long start, long len)
+	throws IOException {
+	if (!exists(f)) {
+	    return null;
+	}
+        String srep = makeAbsolute(f).toUri().getPath();
+        String[][] hints = kfsImpl.getDataLocation(srep, start, len);
+        return hints;
+    }
+
+    public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+	FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
+    }
+
+    public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+	FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
+    }
+
+    public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+	throws IOException {
+	return tmpLocalFile;
+    }
+
+    public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+	throws IOException {
+	moveFromLocalFile(tmpLocalFile, fsOutputFile);
+    }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html Wed Oct  3 14:17:00 2007
@@ -0,0 +1,80 @@
+<html>
+<head></head>
+<body>
+<h1>A client for the Kosmos filesystem (KFS)</h1>
+
+<h3>Introduction</h3>
+
+This pages describes how to use Kosmos Filesystem 
+(<a href="http://kosmosfs.sourceforge.net"> KFS </a>) as a backing
+store with Hadoop.   This page assumes that you have downloaded the
+KFS software and installed necessary binaries as outlined in the KFS
+documentation.
+
+<h3>Steps</h3>
+
+        <ul>
+          <li>In the Hadoop conf directory edit hadoop-default.xml,
+          add the following:
+            <pre>
+&lt;property&gt;
+  &lt;name&gt;fs.kfs.impl&lt;/name&gt;
+  &lt;value&gt;org.apache.hadoop.fs.kfs.KosmosFileSystem&lt;/value&gt;
+  &lt;description&gt;The FileSystem for kfs: uris.&lt;/description&gt;
+&lt;/property&gt;
+            </pre>
+
+          <li>In the Hadoop conf directory edit hadoop-site.xml,
+          adding the following (with appropriate values for
+          &lt;server&gt; and &lt;port&gt;):
+            <pre>
+&lt;property&gt;
+  &lt;name&gt;fs.default.name&lt;/name&gt;
+  &lt;value&gt;kfs://&lt;server:port&gt;&lt;/value&gt; 
+&lt;/property&gt;
+
+&lt;property&gt;
+  &lt;name&gt;fs.kfs.metaServerHost&lt;/name&gt;
+  &lt;value&gt;&lt;server&gt;&lt;/value&gt;
+  &lt;description&gt;The location of the KFS meta server.&lt;/description&gt;
+&lt;/property&gt;
+
+&lt;property&gt;
+  &lt;name&gt;fs.kfs.metaServerPort&lt;/name&gt;
+  &lt;value&gt;&lt;port&gt;&lt;/value&gt;
+  &lt;description&gt;The location of the meta server's port.&lt;/description&gt;
+&lt;/property&gt;
+
+</pre>
+          </li>
+
+          <li>Copy KFS's <i> kfs-0.1.jar </i> to Hadoop's lib directory.  This step
+          enables Hadoop's to load the KFS specific modules.  Note
+          that, kfs-0.1.jar was built when you compiled KFS source
+          code.  This jar file contains code that calls KFS's client
+          library code via JNI; the native code is in KFS's <i>
+          libkfsClient.so </i> library.
+          </li>
+
+          <li> When the Hadoop map/reduce trackers start up, those
+processes (on local as well as remote nodes) will now need to load
+KFS's <i> libkfsClient.so </i> library.  To simplify this process, it is advisable to
+store libkfsClient.so in an NFS accessible directory (similar to where
+Hadoop binaries/scripts are stored); then, modify Hadoop's
+conf/hadoop-env.sh adding the following line and providing suitable
+value for &lt;path&gt;:
+<pre>
+export LD_LIBRARY_PATH=&lt;path&gt;
+</pre>
+
+
+          <li>Start only the map/reduce trackers
+          <br />
+          example: execute Hadoop's bin/start-mapred.sh</li>
+        </ul>
+<br/>
+
+If the map/reduce job trackers start up, all file-I/O is done to KFS.
+
+</body>
+</html>

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java Wed Oct  3 14:17:00 2007
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ * 
+ * We need to provide the ability to the code in fs/kfs without really
+ * having a KFS deployment.  For this purpose, use the LocalFileSystem
+ * as a way to "emulate" KFS.
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+
+public class KFSEmulationImpl implements IFSImpl {
+    FileSystem localFS;
+    
+    public KFSEmulationImpl(Configuration conf) throws IOException {
+        localFS = FileSystem.getLocal(conf);
+    }
+
+    public boolean exists(String path) throws IOException {
+        return localFS.exists(new Path(path));
+    }
+    public boolean isDirectory(String path) throws IOException {
+        return localFS.isDirectory(new Path(path));
+    }
+    public boolean isFile(String path) throws IOException {
+        return localFS.isFile(new Path(path));
+    }
+
+    // as part of the emulation, KFS adds ./.. as directory entries
+    // when doing a directory listing.
+    public String[] readdir(String path) throws IOException {
+        Path[] p = localFS.listPaths(new Path(path));
+        String[] entries = null;
+
+        if (p == null) {
+            if (isDirectory(path)) {
+                // empty dirs have "." and ".."
+                entries = new String[2];
+                entries[0] = new String(".");
+                entries[1] = new String("..");
+            }
+            return entries;
+        }
+
+        if (isDirectory(path)) {
+            // for dirs, add "."/".." as KFS does that
+            entries = new String[p.length + 2];
+            entries[0] = new String(".");
+            entries[1] = new String("..");
+            for (int i = 0; i < p.length; i++)
+                entries[i+2] = p[i].toString();
+        } else {
+            entries = new String[p.length];
+            for (int i = 0; i < p.length; i++)
+                entries[i] = p[i].toString();
+        }
+        return entries;
+    }
+
+    public int mkdirs(String path) throws IOException {
+        if (localFS.mkdirs(new Path(path)))
+            return 0;
+
+        return -1;
+    }
+
+    public int rename(String source, String dest) throws IOException {
+        if (localFS.rename(new Path(source), new Path(dest)))
+            return 0;
+        return -1;
+    }
+
+    public int rmdir(String path) throws IOException {
+        if (isDirectory(path)) {
+            // the directory better be empty
+            String[] dirEntries = readdir(path);
+            if ((dirEntries.length <= 2) && (localFS.delete(new Path(path))))
+                return 0;
+        }
+        return -1;
+    }
+
+    public int remove(String path) throws IOException {
+        if (isFile(path) && (localFS.delete(new Path(path))))
+            return 0;
+        return -1;
+    }
+
+    public long filesize(String path) throws IOException {
+        return localFS.getLength(new Path(path));
+    }
+    public short getReplication(String path) throws IOException {
+        return 1;
+    }
+    public short setReplication(String path, short replication) throws IOException {
+        return 1;
+    }
+    public String[][] getDataLocation(String path, long start, long len) throws IOException {
+        return localFS.getFileCacheHints(new Path(path), start, len);
+    }
+
+    public long getModificationTime(String path) throws IOException {
+        FileStatus s = localFS.getFileStatus(new Path(path));
+        if (s == null)
+            return 0;
+
+        return s.getModificationTime();
+    }
+
+    public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException {
+        // besides path/overwrite, the other args don't matter for
+        // testing purposes.
+        return localFS.create(new Path(path));
+    }
+
+    public FSDataInputStream open(String path, int bufferSize) throws IOException {
+        return localFS.open(new Path(path));
+    }
+
+    
+};

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java Wed Oct  3 14:17:00 2007
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ * 
+ * Unit tests for testing the KosmosFileSystem API implementation.
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.fs.kfs.KosmosFileSystem;
+
+public class TestKosmosFileSystem extends TestCase {
+
+    KosmosFileSystem kosmosFileSystem;
+    KFSEmulationImpl kfsEmul;
+    
+    @Override
+    protected void setUp() throws IOException {
+        Configuration conf = new Configuration();
+    
+        kfsEmul = new KFSEmulationImpl(conf);
+        kosmosFileSystem = new KosmosFileSystem(kfsEmul);
+        // a dummy URI; we are not connecting to any setup here
+        kosmosFileSystem.initialize(URI.create("kfs:///"), conf);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+
+    }
+
+    // @Test
+    // Check all the directory API's in KFS
+    public void testDirs() throws Exception {
+        Path baseDir = new Path("/tmp/test/kfs-test");
+        Path subDir1 = new Path("dir.1");
+
+        // make the dir
+        kosmosFileSystem.mkdirs(baseDir);
+        assertTrue(kosmosFileSystem.isDirectory(baseDir));
+        kosmosFileSystem.setWorkingDirectory(baseDir);
+
+        kosmosFileSystem.mkdirs(subDir1);
+        assertTrue(kosmosFileSystem.isDirectory(subDir1));
+
+        assertFalse(kosmosFileSystem.exists(new Path("test1")));
+        assertFalse(kosmosFileSystem.isDirectory(new Path("test/dir.2")));
+
+        Path[] p = kosmosFileSystem.listPaths(baseDir);
+        assertEquals(p.length, 1);
+
+        kosmosFileSystem.delete(baseDir);
+        assertFalse(kosmosFileSystem.exists(baseDir));
+    }
+
+    // @Test
+    // Check the file API's
+    public void testFiles() throws Exception {
+        Path baseDir = new Path("/tmp/test/kfs-test");
+        Path subDir1 = new Path("dir.1");
+        Path file1 = new Path("dir.1/foo.1");
+        Path file2 = new Path("dir.1/foo.2");
+
+        kosmosFileSystem.mkdirs(baseDir);
+        assertTrue(kosmosFileSystem.isDirectory(baseDir));
+        kosmosFileSystem.setWorkingDirectory(baseDir);
+
+        kosmosFileSystem.mkdirs(subDir1);
+
+        FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null);
+        FSDataOutputStream s2 = kosmosFileSystem.create(file2, true, 4096, (short) 1, (long) 4096, null);
+
+        s1.close();
+        s2.close();
+
+        Path[] p = kosmosFileSystem.listPaths(subDir1);
+        assertEquals(p.length, 2);
+
+        kosmosFileSystem.delete(file1);
+        p = kosmosFileSystem.listPaths(subDir1);
+        assertEquals(p.length, 1);
+
+        kosmosFileSystem.delete(file2);
+        p = kosmosFileSystem.listPaths(subDir1);
+        assertEquals(p, null);
+
+        kosmosFileSystem.delete(baseDir);
+        assertFalse(kosmosFileSystem.exists(baseDir));
+    }
+
+    // @Test
+    // Check file/read write
+    public void testFileIO() throws Exception {
+        Path baseDir = new Path("/tmp/test/kfs-test");
+        Path subDir1 = new Path("dir.1");
+        Path file1 = new Path("dir.1/foo.1");
+
+        kosmosFileSystem.mkdirs(baseDir);
+        assertTrue(kosmosFileSystem.isDirectory(baseDir));
+        kosmosFileSystem.setWorkingDirectory(baseDir);
+
+        kosmosFileSystem.mkdirs(subDir1);
+
+        FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null);
+
+        int bufsz = 4096;
+        byte[] data = new byte[bufsz];
+
+        for (int i = 0; i < data.length; i++)
+            data[i] = (byte) (i % 16);
+
+        // write an integer
+        s1.write(32);
+        // write some data
+        s1.write(data, 0, data.length);
+        // flush out the changes
+        s1.close();
+
+        // Read the stuff back and verify it is correct
+        FSDataInputStream s2 = kosmosFileSystem.open(file1, 4096);
+        int v;
+
+        v = s2.read();
+        assertEquals(v, 32);
+
+        assertEquals(s2.available(), data.length);
+
+        byte[] buf = new byte[bufsz];
+        s2.read(buf, 0, buf.length);
+        for (int i = 0; i < data.length; i++)
+            assertEquals(data[i], buf[i]);
+
+        assertEquals(s2.available(), 0);
+
+        s2.close();
+
+        kosmosFileSystem.delete(file1);
+        assertFalse(kosmosFileSystem.exists(file1));        
+        kosmosFileSystem.delete(subDir1);
+        assertFalse(kosmosFileSystem.exists(subDir1));        
+        kosmosFileSystem.delete(baseDir);
+        assertFalse(kosmosFileSystem.exists(baseDir));        
+    }
+    
+}