You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/10/03 23:17:04 UTC
svn commit: r581725 - in /lucene/hadoop/trunk: ./ conf/ lib/
src/java/org/apache/hadoop/fs/kfs/ src/test/org/apache/hadoop/fs/kfs/
Author: cutting
Date: Wed Oct 3 14:17:00 2007
New Revision: 581725
URL: http://svn.apache.org/viewvc?rev=581725&view=rev
Log:
HADOOP-1963. Add a FileSystem implementation for the Kosmos Filesystem (KFS). Contributed by Sriram Rao.
Added:
lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt
lucene/hadoop/trunk/lib/kfs-0.1.jar (with props)
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581725&r1=581724&r2=581725&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 3 14:17:00 2007
@@ -74,6 +74,8 @@
and codec, independent of the final output's compression
parameters. (Arun C Murthy via cutting)
+ HADOOP-1963. Add a FileSystem implementation for the Kosmos
+ Filesystem (KFS). (Sriram Rao via cutting)
OPTIMIZATIONS
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=581725&r1=581724&r2=581725&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Oct 3 14:17:00 2007
@@ -145,6 +145,12 @@
</property>
<property>
+ <name>fs.kfs.impl</name>
+ <value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value>
+ <description>The FileSystem for kfs: uris.</description>
+</property>
+
+<property>
<name>fs.hftp.impl</name>
<value>org.apache.hadoop.dfs.HftpFileSystem</value>
</property>
Added: lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt (added)
+++ lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt Wed Oct 3 14:17:00 2007
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
Added: lucene/hadoop/trunk/lib/kfs-0.1.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/kfs-0.1.jar?rev=581725&view=auto
==============================================================================
Binary file - no diff available.
Propchange: lucene/hadoop/trunk/lib/kfs-0.1.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java Wed Oct 3 14:17:00 2007
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ *
+ * We need to provide the ability to the code in fs/kfs without really
+ * having a KFS deployment. In particular, the glue code that wraps
+ * around calls to KfsAccess object. This is accomplished by defining a
+ * filesystem implementation interface:
+ * -- for testing purposes, a dummy implementation of this interface
+ * will suffice; as long as the dummy implementation is close enough
+ * to doing what KFS does, we are good.
+ * -- for deployment purposes with KFS, this interface is
+ * implemented by the KfsImpl object.
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+interface IFSImpl {
+ public boolean exists(String path) throws IOException;
+ public boolean isDirectory(String path) throws IOException;
+ public boolean isFile(String path) throws IOException;
+ public String[] readdir(String path) throws IOException;
+
+ public int mkdirs(String path) throws IOException;
+ public int rename(String source, String dest) throws IOException;
+
+ public int rmdir(String path) throws IOException;
+ public int remove(String path) throws IOException;
+ public long filesize(String path) throws IOException;
+ public short getReplication(String path) throws IOException;
+ public short setReplication(String path, short replication) throws IOException;
+ public String[][] getDataLocation(String path, long start, long len) throws IOException;
+
+ public long getModificationTime(String path) throws IOException;
+ public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException;
+ public FSDataInputStream open(String path, int bufferSize) throws IOException;
+
+};
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java Wed Oct 3 14:17:00 2007
@@ -0,0 +1,104 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ *
+ * Provide the implementation of KFS which turn into calls to KfsAccess.
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+import org.kosmix.kosmosfs.access.KfsAccess;
+
+class KFSImpl implements IFSImpl {
+ private KfsAccess kfsAccess = null;
+
+ public KFSImpl(String metaServerHost, int metaServerPort) throws IOException {
+ kfsAccess = new KfsAccess(metaServerHost, metaServerPort);
+ }
+
+ public boolean exists(String path) throws IOException {
+ return kfsAccess.kfs_exists(path);
+ }
+
+ public boolean isDirectory(String path) throws IOException {
+ return kfsAccess.kfs_isDirectory(path);
+ }
+
+ public boolean isFile(String path) throws IOException {
+ return kfsAccess.kfs_isFile(path);
+ }
+
+ public String[] readdir(String path) throws IOException {
+ return kfsAccess.kfs_readdir(path);
+ }
+
+ public int mkdirs(String path) throws IOException {
+ return kfsAccess.kfs_mkdirs(path);
+ }
+
+ public int rename(String source, String dest) throws IOException {
+ return kfsAccess.kfs_rename(source, dest);
+ }
+
+ public int rmdir(String path) throws IOException {
+ return kfsAccess.kfs_rmdir(path);
+ }
+
+ public int remove(String path) throws IOException {
+ return kfsAccess.kfs_remove(path);
+ }
+
+ public long filesize(String path) throws IOException {
+ return kfsAccess.kfs_filesize(path);
+ }
+
+ public short getReplication(String path) throws IOException {
+ return kfsAccess.kfs_getReplication(path);
+ }
+
+ public short setReplication(String path, short replication) throws IOException {
+ return kfsAccess.kfs_setReplication(path, replication);
+ }
+
+ public String[][] getDataLocation(String path, long start, long len) throws IOException {
+ return kfsAccess.kfs_getDataLocation(path, start, len);
+ }
+
+ public long getModificationTime(String path) throws IOException {
+ // Supporting this API requires changes to the Java-side of
+ // the KFS client API. For now, return 0; in the next rev of
+ // KFS, we'll update the Java API.
+ return 0;
+ }
+
+ public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException {
+ return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication));
+ }
+
+ public FSDataInputStream open(String path, int bufferSize) throws IOException {
+ return new FSDataInputStream(new KFSInputStream(kfsAccess, path));
+ }
+};
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java Wed Oct 3 14:17:00 2007
@@ -0,0 +1,123 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ *
+ * Implements the Hadoop FSInputStream interfaces to allow applications to read
+ * files in Kosmos File System (KFS).
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.util.Progressable;
+
+import org.kosmix.kosmosfs.access.KfsAccess;
+import org.kosmix.kosmosfs.access.KfsInputChannel;
+
+class KFSInputStream extends FSInputStream {
+
+ private String path;
+ private KfsInputChannel kfsChannel;
+
+ private long fsize;
+
+ public KFSInputStream(KfsAccess kfsAccess, String path) {
+ this.path = path;
+
+ this.kfsChannel = kfsAccess.kfs_open(path);
+ if (this.kfsChannel != null)
+ this.fsize = kfsAccess.kfs_filesize(path);
+ else
+ this.fsize = 0;
+ }
+
+ public long getPos() throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+ return kfsChannel.tell();
+ }
+
+ public synchronized int available() throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+ return (int) (this.fsize - getPos());
+ }
+
+ public synchronized void seek(long targetPos) throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+ kfsChannel.seek(targetPos);
+ }
+
+ public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ public synchronized int read() throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+ byte b[] = new byte[4];
+ int res = read(b, 0, 4);
+ if (res == 4)
+ return (b[0] + (b[1] << 8) + (b[2] << 16) + (b[3] << 24));
+ return -1;
+ }
+
+ public synchronized int read(byte b[], int off, int len) throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+ int res;
+
+ res = kfsChannel.read(ByteBuffer.wrap(b, off, len));
+ // Use -1 to signify EOF
+ if (res == 0)
+ return -1;
+ return res;
+ }
+
+ public synchronized void close() throws IOException {
+ if (kfsChannel == null) {
+ return;
+ }
+
+ kfsChannel.close();
+ kfsChannel = null;
+ }
+
+ public boolean markSupported() {
+ return false;
+ }
+
+ public void mark(int readLimit) {
+ // Do nothing
+ }
+
+ public void reset() throws IOException {
+ throw new IOException("Mark not supported");
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java Wed Oct 3 14:17:00 2007
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ *
+ * Implements the Hadoop FSOutputStream interfaces to allow applications to write to
+ * files in Kosmos File System (KFS).
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.util.Progressable;
+
+import org.kosmix.kosmosfs.access.KfsAccess;
+import org.kosmix.kosmosfs.access.KfsOutputChannel;
+
+class KFSOutputStream extends OutputStream {
+
+ private String path;
+ private KfsOutputChannel kfsChannel;
+
+ public KFSOutputStream(KfsAccess kfsAccess, String path, short replication) {
+ this.path = path;
+
+ this.kfsChannel = kfsAccess.kfs_create(path, replication);
+ }
+
+ public long getPos() throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+ return kfsChannel.tell();
+ }
+
+ public void write(int v) throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+ byte[] b = new byte[4];
+
+ b[0] = (byte) (v & 0xFF);
+ b[1] = (byte) ((v >> 8) & 0xFF);
+ b[1] = (byte) ((v >> 16) & 0xFF);
+ b[1] = (byte) ((v >> 24) & 0xFF);
+ write(b, 0, 4);
+ }
+
+ public void write(byte b[], int off, int len) throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+
+ kfsChannel.write(ByteBuffer.wrap(b, off, len));
+ }
+
+ public void flush() throws IOException {
+ if (kfsChannel == null) {
+ throw new IOException("File closed");
+ }
+ kfsChannel.sync();
+ }
+
+ public synchronized void close() throws IOException {
+ if (kfsChannel == null) {
+ return;
+ }
+ flush();
+ kfsChannel.close();
+ kfsChannel = null;
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Wed Oct 3 14:17:00 2007
@@ -0,0 +1,383 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ *
+ * Implements the Hadoop FS interfaces to allow applications to store
+ *files in Kosmos File System (KFS).
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A FileSystem backed by KFS.
+ *
+ */
+
+public class KosmosFileSystem extends FileSystem {
+
+ private FileSystem localFs;
+ private IFSImpl kfsImpl = null;
+ private URI uri;
+ private Path workingDir = new Path("/");
+
+ public KosmosFileSystem() {
+
+ }
+
+ KosmosFileSystem(IFSImpl fsimpl) {
+ this.kfsImpl = fsimpl;
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void initialize(URI uri, Configuration conf) throws IOException {
+
+ try {
+ if (kfsImpl == null) {
+ kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
+ conf.getInt("fs.kfs.metaServerPort", -1));
+ }
+ this.localFs = FileSystem.getLocal(conf);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Unable to initialize KFS");
+ System.exit(-1);
+ }
+ }
+
+ @Deprecated
+ public String getName() {
+ return getUri().toString();
+ }
+
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ public void setWorkingDirectory(Path dir) {
+ workingDir = makeAbsolute(dir);
+ }
+
+ private Path makeAbsolute(Path path) {
+ if (path.isAbsolute()) {
+ return path;
+ }
+ return new Path(workingDir, path);
+ }
+
+ public boolean exists(Path path) throws IOException {
+ // stat the path to make sure it exists
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+ return kfsImpl.exists(srep);
+ }
+
+ public boolean mkdirs(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ int res;
+
+ // System.out.println("Calling mkdirs on: " + srep);
+
+ res = kfsImpl.mkdirs(srep);
+
+ return res == 0;
+ }
+
+ @Deprecated
+ public boolean isDirectory(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ // System.out.println("Calling isdir on: " + srep);
+
+ return kfsImpl.isDirectory(srep);
+ }
+
+ @Deprecated
+ public boolean isFile(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+ return kfsImpl.isFile(srep);
+ }
+
+ public long getContentLength(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ if (kfsImpl.isFile(srep))
+ return kfsImpl.filesize(srep);
+
+ String[] entries = kfsImpl.readdir(srep);
+
+ if (entries == null)
+ return 0;
+
+ // kfsreaddir() returns "." and ".."; strip them before
+ // passing back to hadoop fs.
+ long numEntries = 0;
+ for (int i = 0; i < entries.length; i++) {
+ if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+ continue;
+ numEntries++;
+ }
+ return numEntries;
+ }
+
+ public FileStatus[] listStatus(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ if (kfsImpl.isFile(srep))
+ return new FileStatus[] { getFileStatus(path) } ;
+
+ String[] entries = kfsImpl.readdir(srep);
+
+ if (entries == null)
+ return null;
+
+ // kfsreaddir() returns "." and ".."; strip them before
+ // passing back to hadoop fs.
+ int numEntries = 0;
+ for (int i = 0; i < entries.length; i++) {
+ if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+ continue;
+ numEntries++;
+ }
+ if (numEntries == 0) {
+ return null;
+ }
+
+ // System.out.println("Calling listStatus on: " + path);
+
+ FileStatus[] pathEntries = new FileStatus[numEntries];
+ int j = 0;
+ for (int i = 0; i < entries.length; i++) {
+ if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+ continue;
+
+ pathEntries[j] = getFileStatus(new Path(path, entries[i]));
+ j++;
+ }
+ return pathEntries;
+ }
+
+ public FileStatus getFileStatus(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ if (kfsImpl.isDirectory(srep)) {
+ // System.out.println("Status of path: " + path + " is dir");
+ return new FileStatus(0, true, 1, 0, 0, path);
+ } else {
+ // System.out.println("Status of path: " + path + " is file");
+ return new FileStatus(kfsImpl.filesize(srep), false,
+ kfsImpl.getReplication(srep),
+ getDefaultBlockSize(),
+ kfsImpl.getModificationTime(srep), path);
+ }
+ }
+
+
+ public Path[] listPaths(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ if (kfsImpl.isFile(srep))
+ return new Path[] { path } ;
+
+ String[] entries = kfsImpl.readdir(srep);
+
+ if (entries == null)
+ return null;
+
+ // kfsreaddir() returns "." and ".."; strip them before
+ // passing back to hadoop fs.
+ int numEntries = 0;
+ for (int i = 0; i < entries.length; i++) {
+ if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+ continue;
+ numEntries++;
+ }
+ if (numEntries == 0) {
+ return null;
+ }
+ Path[] pathEntries = new Path[numEntries];
+ int j = 0;
+ for (int i = 0; i < entries.length; i++) {
+ if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0))
+ continue;
+
+ pathEntries[j] = new Path(path, entries[i]);
+ j++;
+ }
+ return pathEntries;
+
+ }
+
+ public FSDataOutputStream create(Path file, boolean overwrite, int bufferSize,
+ short replication, long blockSize, Progressable progress)
+ throws IOException {
+
+ if (exists(file)) {
+ if (overwrite) {
+ delete(file);
+ } else {
+ throw new IOException("File already exists: " + file);
+ }
+ }
+
+ Path parent = file.getParent();
+ if (parent != null && !mkdirs(parent)) {
+ throw new IOException("Mkdirs failed to create " + parent);
+ }
+
+ Path absolute = makeAbsolute(file);
+ String srep = absolute.toUri().getPath();
+
+ return kfsImpl.create(srep, replication, bufferSize);
+ }
+
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ if (!exists(path))
+ throw new IOException("File does not exist: " + path);
+
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ return kfsImpl.open(srep, bufferSize);
+ }
+
+ public boolean rename(Path src, Path dst) throws IOException {
+ Path absoluteS = makeAbsolute(src);
+ String srepS = absoluteS.toUri().getPath();
+ Path absoluteD = makeAbsolute(dst);
+ String srepD = absoluteD.toUri().getPath();
+
+ // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
+
+ return kfsImpl.rename(srepS, srepD) == 0;
+ }
+
+ // recursively delete the directory and its contents
+ public boolean delete(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ if (kfsImpl.isFile(srep))
+ return kfsImpl.remove(srep) == 0;
+
+ Path[] dirEntries = listPaths(absolute);
+ if (dirEntries != null) {
+ for (int i = 0; i < dirEntries.length; i++) {
+ delete(new Path(absolute, dirEntries[i]));
+ }
+ }
+ return kfsImpl.rmdir(srep) == 0;
+ }
+
+ @Deprecated
+ public long getLength(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+ return kfsImpl.filesize(srep);
+ }
+
+ @Deprecated
+ public short getReplication(Path path) throws IOException {
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+ return kfsImpl.getReplication(srep);
+ }
+
+ public short getDefaultReplication() {
+ return 3;
+ }
+
+ public boolean setReplication(Path path, short replication)
+ throws IOException {
+
+ Path absolute = makeAbsolute(path);
+ String srep = absolute.toUri().getPath();
+
+ int res = kfsImpl.setReplication(srep, replication);
+ return res >= 0;
+ }
+
+ // 64MB is the KFS block size
+
+ public long getDefaultBlockSize() {
+ return 1 << 26;
+ }
+
+ @Deprecated
+ public void lock(Path path, boolean shared) throws IOException {
+
+ }
+
+ @Deprecated
+ public void release(Path path) throws IOException {
+
+ }
+
+ /**
+ * Return null if the file doesn't exist; otherwise, get the
+ * locations of the various chunks of the file file from KFS.
+ */
+ public String[][] getFileCacheHints(Path f, long start, long len)
+ throws IOException {
+ if (!exists(f)) {
+ return null;
+ }
+ String srep = makeAbsolute(f).toUri().getPath();
+ String[][] hints = kfsImpl.getDataLocation(srep, start, len);
+ return hints;
+ }
+
+ public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+ FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
+ }
+
+ public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
+ FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
+ }
+
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ return tmpLocalFile;
+ }
+
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ moveFromLocalFile(tmpLocalFile, fsOutputFile);
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html Wed Oct 3 14:17:00 2007
@@ -0,0 +1,80 @@
+<html>
+<head></head>
+<body>
+<h1>A client for the Kosmos filesystem (KFS)</h1>
+
+<h3>Introduction</h3>
+
+This pages describes how to use Kosmos Filesystem
+(<a href="http://kosmosfs.sourceforge.net"> KFS </a>) as a backing
+store with Hadoop. This page assumes that you have downloaded the
+KFS software and installed necessary binaries as outlined in the KFS
+documentation.
+
+<h3>Steps</h3>
+
+ <ul>
+ <li>In the Hadoop conf directory edit hadoop-default.xml,
+ add the following:
+ <pre>
+<property>
+ <name>fs.kfs.impl</name>
+ <value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value>
+ <description>The FileSystem for kfs: uris.</description>
+</property>
+ </pre>
+
+ <li>In the Hadoop conf directory edit hadoop-site.xml,
+ adding the following (with appropriate values for
+ <server> and <port>):
+ <pre>
+<property>
+ <name>fs.default.name</name>
+ <value>kfs://<server:port></value>
+</property>
+
+<property>
+ <name>fs.kfs.metaServerHost</name>
+ <value><server></value>
+ <description>The location of the KFS meta server.</description>
+</property>
+
+<property>
+ <name>fs.kfs.metaServerPort</name>
+ <value><port></value>
+ <description>The location of the meta server's port.</description>
+</property>
+
+</pre>
+ </li>
+
+ <li>Copy KFS's <i> kfs-0.1.jar </i> to Hadoop's lib directory. This step
+ enables Hadoop's to load the KFS specific modules. Note
+ that, kfs-0.1.jar was built when you compiled KFS source
+ code. This jar file contains code that calls KFS's client
+ library code via JNI; the native code is in KFS's <i>
+ libkfsClient.so </i> library.
+ </li>
+
+ <li> When the Hadoop map/reduce trackers start up, those
+processes (on local as well as remote nodes) will now need to load
+KFS's <i> libkfsClient.so </i> library. To simplify this process, it is advisable to
+store libkfsClient.so in an NFS accessible directory (similar to where
+Hadoop binaries/scripts are stored); then, modify Hadoop's
+conf/hadoop-env.sh adding the following line and providing suitable
+value for <path>:
+<pre>
+export LD_LIBRARY_PATH=<path>
+</pre>
+
+
+ <li>Start only the map/reduce trackers
+ <br />
+ example: execute Hadoop's bin/start-mapred.sh</li>
+ </ul>
+<br/>
+
+If the map/reduce job trackers start up, all file-I/O is done to KFS.
+
+</body>
+</html>
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java Wed Oct 3 14:17:00 2007
@@ -0,0 +1,145 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ *
+ * We need to provide the ability to the code in fs/kfs without really
+ * having a KFS deployment. For this purpose, use the LocalFileSystem
+ * as a way to "emulate" KFS.
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+
+public class KFSEmulationImpl implements IFSImpl {
+ FileSystem localFS;
+
+ public KFSEmulationImpl(Configuration conf) throws IOException {
+ localFS = FileSystem.getLocal(conf);
+ }
+
+ public boolean exists(String path) throws IOException {
+ return localFS.exists(new Path(path));
+ }
+ public boolean isDirectory(String path) throws IOException {
+ return localFS.isDirectory(new Path(path));
+ }
+ public boolean isFile(String path) throws IOException {
+ return localFS.isFile(new Path(path));
+ }
+
+ // as part of the emulation, KFS adds ./.. as directory entries
+ // when doing a directory listing.
+ public String[] readdir(String path) throws IOException {
+ Path[] p = localFS.listPaths(new Path(path));
+ String[] entries = null;
+
+ if (p == null) {
+ if (isDirectory(path)) {
+ // empty dirs have "." and ".."
+ entries = new String[2];
+ entries[0] = new String(".");
+ entries[1] = new String("..");
+ }
+ return entries;
+ }
+
+ if (isDirectory(path)) {
+ // for dirs, add "."/".." as KFS does that
+ entries = new String[p.length + 2];
+ entries[0] = new String(".");
+ entries[1] = new String("..");
+ for (int i = 0; i < p.length; i++)
+ entries[i+2] = p[i].toString();
+ } else {
+ entries = new String[p.length];
+ for (int i = 0; i < p.length; i++)
+ entries[i] = p[i].toString();
+ }
+ return entries;
+ }
+
+ public int mkdirs(String path) throws IOException {
+ if (localFS.mkdirs(new Path(path)))
+ return 0;
+
+ return -1;
+ }
+
+ public int rename(String source, String dest) throws IOException {
+ if (localFS.rename(new Path(source), new Path(dest)))
+ return 0;
+ return -1;
+ }
+
+ public int rmdir(String path) throws IOException {
+ if (isDirectory(path)) {
+ // the directory better be empty
+ String[] dirEntries = readdir(path);
+ if ((dirEntries.length <= 2) && (localFS.delete(new Path(path))))
+ return 0;
+ }
+ return -1;
+ }
+
+ public int remove(String path) throws IOException {
+ if (isFile(path) && (localFS.delete(new Path(path))))
+ return 0;
+ return -1;
+ }
+
+ public long filesize(String path) throws IOException {
+ return localFS.getLength(new Path(path));
+ }
+ public short getReplication(String path) throws IOException {
+ return 1;
+ }
+ public short setReplication(String path, short replication) throws IOException {
+ return 1;
+ }
+ public String[][] getDataLocation(String path, long start, long len) throws IOException {
+ return localFS.getFileCacheHints(new Path(path), start, len);
+ }
+
+ public long getModificationTime(String path) throws IOException {
+ FileStatus s = localFS.getFileStatus(new Path(path));
+ if (s == null)
+ return 0;
+
+ return s.getModificationTime();
+ }
+
+ public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException {
+ // besides path/overwrite, the other args don't matter for
+ // testing purposes.
+ return localFS.create(new Path(path));
+ }
+
+ public FSDataInputStream open(String path, int bufferSize) throws IOException {
+ return localFS.open(new Path(path));
+ }
+
+
+};
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java?rev=581725&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java Wed Oct 3 14:17:00 2007
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * @author: Sriram Rao (Kosmix Corp.)
+ *
+ * Unit tests for testing the KosmosFileSystem API implementation.
+ */
+
+package org.apache.hadoop.fs.kfs;
+
+import java.io.*;
+import java.net.*;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.fs.kfs.KosmosFileSystem;
+
+public class TestKosmosFileSystem extends TestCase {
+
+ KosmosFileSystem kosmosFileSystem;
+ KFSEmulationImpl kfsEmul;
+
+ @Override
+ protected void setUp() throws IOException {
+ Configuration conf = new Configuration();
+
+ kfsEmul = new KFSEmulationImpl(conf);
+ kosmosFileSystem = new KosmosFileSystem(kfsEmul);
+ // a dummy URI; we are not connecting to any setup here
+ kosmosFileSystem.initialize(URI.create("kfs:///"), conf);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ }
+
+ // @Test
+ // Check all the directory API's in KFS
+ public void testDirs() throws Exception {
+ Path baseDir = new Path("/tmp/test/kfs-test");
+ Path subDir1 = new Path("dir.1");
+
+ // make the dir
+ kosmosFileSystem.mkdirs(baseDir);
+ assertTrue(kosmosFileSystem.isDirectory(baseDir));
+ kosmosFileSystem.setWorkingDirectory(baseDir);
+
+ kosmosFileSystem.mkdirs(subDir1);
+ assertTrue(kosmosFileSystem.isDirectory(subDir1));
+
+ assertFalse(kosmosFileSystem.exists(new Path("test1")));
+ assertFalse(kosmosFileSystem.isDirectory(new Path("test/dir.2")));
+
+ Path[] p = kosmosFileSystem.listPaths(baseDir);
+ assertEquals(p.length, 1);
+
+ kosmosFileSystem.delete(baseDir);
+ assertFalse(kosmosFileSystem.exists(baseDir));
+ }
+
+ // @Test
+ // Check the file API's
+ public void testFiles() throws Exception {
+ Path baseDir = new Path("/tmp/test/kfs-test");
+ Path subDir1 = new Path("dir.1");
+ Path file1 = new Path("dir.1/foo.1");
+ Path file2 = new Path("dir.1/foo.2");
+
+ kosmosFileSystem.mkdirs(baseDir);
+ assertTrue(kosmosFileSystem.isDirectory(baseDir));
+ kosmosFileSystem.setWorkingDirectory(baseDir);
+
+ kosmosFileSystem.mkdirs(subDir1);
+
+ FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null);
+ FSDataOutputStream s2 = kosmosFileSystem.create(file2, true, 4096, (short) 1, (long) 4096, null);
+
+ s1.close();
+ s2.close();
+
+ Path[] p = kosmosFileSystem.listPaths(subDir1);
+ assertEquals(p.length, 2);
+
+ kosmosFileSystem.delete(file1);
+ p = kosmosFileSystem.listPaths(subDir1);
+ assertEquals(p.length, 1);
+
+ kosmosFileSystem.delete(file2);
+ p = kosmosFileSystem.listPaths(subDir1);
+ assertEquals(p, null);
+
+ kosmosFileSystem.delete(baseDir);
+ assertFalse(kosmosFileSystem.exists(baseDir));
+ }
+
+ // @Test
+ // Check file/read write
+ public void testFileIO() throws Exception {
+ Path baseDir = new Path("/tmp/test/kfs-test");
+ Path subDir1 = new Path("dir.1");
+ Path file1 = new Path("dir.1/foo.1");
+
+ kosmosFileSystem.mkdirs(baseDir);
+ assertTrue(kosmosFileSystem.isDirectory(baseDir));
+ kosmosFileSystem.setWorkingDirectory(baseDir);
+
+ kosmosFileSystem.mkdirs(subDir1);
+
+ FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null);
+
+ int bufsz = 4096;
+ byte[] data = new byte[bufsz];
+
+ for (int i = 0; i < data.length; i++)
+ data[i] = (byte) (i % 16);
+
+ // write an integer
+ s1.write(32);
+ // write some data
+ s1.write(data, 0, data.length);
+ // flush out the changes
+ s1.close();
+
+ // Read the stuff back and verify it is correct
+ FSDataInputStream s2 = kosmosFileSystem.open(file1, 4096);
+ int v;
+
+ v = s2.read();
+ assertEquals(v, 32);
+
+ assertEquals(s2.available(), data.length);
+
+ byte[] buf = new byte[bufsz];
+ s2.read(buf, 0, buf.length);
+ for (int i = 0; i < data.length; i++)
+ assertEquals(data[i], buf[i]);
+
+ assertEquals(s2.available(), 0);
+
+ s2.close();
+
+ kosmosFileSystem.delete(file1);
+ assertFalse(kosmosFileSystem.exists(file1));
+ kosmosFileSystem.delete(subDir1);
+ assertFalse(kosmosFileSystem.exists(subDir1));
+ kosmosFileSystem.delete(baseDir);
+ assertFalse(kosmosFileSystem.exists(baseDir));
+ }
+
+}