You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/29 06:32:00 UTC
svn commit: r690096 [15/15] - in /hadoop/core/trunk: ./
src/contrib/thriftfs/ src/contrib/thriftfs/gen-cocoa/
src/contrib/thriftfs/gen-cpp/ src/contrib/thriftfs/gen-java/
src/contrib/thriftfs/gen-java/org/
src/contrib/thriftfs/gen-java/org/apache/ src/...
Added: hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java?rev=690096&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java (added)
+++ hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java Thu Aug 28 21:31:57 2008
@@ -0,0 +1,616 @@
+package org.apache.hadoop.thriftfs;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TApplicationException;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.server.TServer;
+import com.facebook.thrift.server.TThreadPoolServer;
+import com.facebook.thrift.transport.TServerSocket;
+import com.facebook.thrift.transport.TServerTransport;
+import com.facebook.thrift.transport.TTransportFactory;
+
+// Include Generated code
+import org.apache.hadoop.thriftfs.api.*;
+import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
+
+import java.io.*;
+import java.util.*;
+import java.net.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * ThriftHadoopFileSystem
+ * A thrift wrapper around the Hadoop File System
+ */
+public class HadoopThriftServer extends ThriftHadoopFileSystem {
+
+ static int serverPort = 0; // default port
+ TServer server = null;
+
+ public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
+ {
+
+ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift");
+
+ // HDFS glue
+ Configuration conf;
+ FileSystem fs;
+
+ // stucture that maps each Thrift object into an hadoop object
+ private long nextId = new Random().nextLong();
+ private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>();
+ private Daemon inactivityThread = null;
+
+ // Detect inactive session
+ private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
+ private static volatile long inactivityRecheckInterval = 60 * 1000;
+ private static volatile boolean fsRunning = true;
+ private static long now;
+
+ // allow outsider to change the hadoopthrift path
+ public void setOption(String key, String val) {
+ }
+
+ /**
+ * Current system time.
+ * @return current time in msec.
+ */
+ static long now() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * getVersion
+ *
+ * @return current version of the interface.
+ */
+ public String getVersion() {
+ return "0.1";
+ }
+
+ /**
+ * shutdown
+ *
+ * cleanly closes everything and exit.
+ */
+ public void shutdown(int status) {
+ LOG.info("HadoopThriftServer shutting down.");
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.warn("Unable to close file system");
+ }
+ Runtime.getRuntime().exit(status);
+ }
+
+ /**
+ * Periodically checks to see if there is inactivity
+ */
+ class InactivityMonitor implements Runnable {
+ public void run() {
+ while (fsRunning) {
+ try {
+ if (now() > now + inactivityPeriod) {
+ LOG.warn("HadoopThriftServer Inactivity period of " +
+ inactivityPeriod + " expired... Stopping Server.");
+ shutdown(-1);
+ }
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ try {
+ Thread.sleep(inactivityRecheckInterval);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+
+ /**
+ * HadoopThriftServer
+ *
+ * Constructor for the HadoopThriftServer glue with Thrift Class.
+ *
+ * @param name - the name of this handler
+ */
+ public HadoopThriftHandler(String name) {
+ conf = new Configuration();
+ now = now();
+ try {
+ inactivityThread = new Daemon(new InactivityMonitor());
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ LOG.warn("Unable to open hadoop file system...");
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+
+ /**
+ * printStackTrace
+ *
+ * Helper function to print an exception stack trace to the log and not stderr
+ *
+ * @param e the exception
+ *
+ */
+ static private void printStackTrace(Exception e) {
+ for(StackTraceElement s: e.getStackTrace()) {
+ LOG.error(s);
+ }
+ }
+
+ /**
+ * Lookup a thrift object into a hadoop object
+ */
+ private synchronized Object lookup(long id) {
+ return hadoopHash.get(new Long(id));
+ }
+
+ /**
+ * Insert a thrift object into a hadoop object. Return its id.
+ */
+ private synchronized long insert(Object o) {
+ nextId++;
+ hadoopHash.put(nextId, o);
+ return nextId;
+ }
+
+ /**
+ * Delete a thrift object from the hadoop store.
+ */
+ private synchronized Object remove(long id) {
+ return hadoopHash.remove(new Long(id));
+ }
+
+ /**
+ * Implement the API exported by this thrift server
+ */
+
+ /** Set inactivity timeout period. The period is specified in seconds.
+ * if there are no RPC calls to the HadoopThrift server for this much
+ * time, then the server kills itself.
+ */
+ public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
+ inactivityPeriod = periodInSeconds * 1000; // in milli seconds
+ if (inactivityRecheckInterval > inactivityPeriod ) {
+ inactivityRecheckInterval = inactivityPeriod;
+ }
+ }
+
+
+ /**
+ * Create a file and open it for writing
+ */
+ public ThriftHandle create(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("create: " + path);
+ FSDataOutputStream out = fs.create(new Path(path.pathname));
+ long id = insert(out);
+ ThriftHandle obj = new ThriftHandle(id);
+ HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
+ return obj;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Create a file and open it for writing, delete file if it exists
+ */
+ public ThriftHandle createFile(Pathname path,
+ short mode,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("create: " + path +
+ " permission: " + mode +
+ " overwrite: " + overwrite +
+ " bufferSize: " + bufferSize +
+ " replication: " + replication +
+ " blockSize: " + blockSize);
+ FSDataOutputStream out = fs.create(new Path(path.pathname),
+ new FsPermission(mode),
+ overwrite,
+ bufferSize,
+ replication,
+ blockSize,
+ null); // progress
+ long id = insert(out);
+ ThriftHandle obj = new ThriftHandle(id);
+ HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
+ return obj;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Opens an existing file and returns a handle to read it
+ */
+ public ThriftHandle open(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("open: " + path);
+ FSDataInputStream out = fs.open(new Path(path.pathname));
+ long id = insert(out);
+ ThriftHandle obj = new ThriftHandle(id);
+ HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
+ return obj;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Opens an existing file to append to it.
+ */
+ public ThriftHandle append(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("append: " + path);
+ FSDataOutputStream out = fs.append(new Path(path.pathname));
+ long id = insert(out);
+ ThriftHandle obj = new ThriftHandle(id);
+ HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
+ return obj;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * write to a file
+ */
+ public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("write: " + tout.id);
+ FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
+ byte[] tmp = data.getBytes("UTF-8");
+ out.write(tmp, 0, tmp.length);
+ HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
+ return true;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * read from a file
+ */
+ public String read(ThriftHandle tout, long offset,
+ int length) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("read: " + tout.id +
+ " offset: " + offset +
+ " length: " + length);
+ FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
+ if (in.getPos() != offset) {
+ in.seek(offset);
+ }
+ byte[] tmp = new byte[length];
+ int numbytes = in.read(offset, tmp, 0, length);
+ HadoopThriftHandler.LOG.debug("read done: " + tout.id);
+ return new String(tmp, 0, numbytes, "UTF-8");
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Delete a file/directory
+ */
+ public boolean rm(Pathname path, boolean recursive)
+ throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("rm: " + path +
+ " recursive: " + recursive);
+ boolean ret = fs.delete(new Path(path.pathname), recursive);
+ HadoopThriftHandler.LOG.debug("rm: " + path);
+ return ret;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Move a file/directory
+ */
+ public boolean rename(Pathname path, Pathname dest)
+ throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("rename: " + path +
+ " destination: " + dest);
+ boolean ret = fs.rename(new Path(path.pathname),
+ new Path(dest.pathname));
+ HadoopThriftHandler.LOG.debug("rename: " + path);
+ return ret;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * close file
+ */
+ public boolean close(ThriftHandle tout) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("close: " + tout.id);
+ Object obj = remove(tout.id);
+ if (obj instanceof FSDataOutputStream) {
+ FSDataOutputStream out = (FSDataOutputStream)obj;
+ out.close();
+ } else if (obj instanceof FSDataInputStream) {
+ FSDataInputStream in = (FSDataInputStream)obj;
+ in.close();
+ } else {
+ throw new ThriftIOException("Unknown thrift handle.");
+ }
+ HadoopThriftHandler.LOG.debug("closed: " + tout.id);
+ return true;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Create a directory
+ */
+ public boolean mkdirs(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("mkdirs: " + path);
+ boolean ret = fs.mkdirs(new Path(path.pathname));
+ HadoopThriftHandler.LOG.debug("mkdirs: " + path);
+ return ret;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Does this pathname exist?
+ */
+ public boolean exists(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("exists: " + path);
+ boolean ret = fs.exists(new Path(path.pathname));
+ HadoopThriftHandler.LOG.debug("exists done: " + path);
+ return ret;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Returns status about the specified pathname
+ */
+ public org.apache.hadoop.thriftfs.api.FileStatus stat(
+ Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("stat: " + path);
+ org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
+ new Path(path.pathname));
+ HadoopThriftHandler.LOG.debug("stat done: " + path);
+ return new org.apache.hadoop.thriftfs.api.FileStatus(
+ stat.getPath().toString(),
+ stat.getLen(),
+ stat.isDir(),
+ stat.getReplication(),
+ stat.getBlockSize(),
+ stat.getModificationTime(),
+ stat.getPermission().toString(),
+ stat.getOwner(),
+ stat.getGroup());
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * If the specified pathname is a directory, then return the
+ * list of pathnames in this directory
+ */
+ public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
+ Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("listStatus: " + path);
+
+ org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
+ new Path(path.pathname));
+ HadoopThriftHandler.LOG.debug("listStatus done: " + path);
+ org.apache.hadoop.thriftfs.api.FileStatus tmp;
+ List<org.apache.hadoop.thriftfs.api.FileStatus> value =
+ new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();
+
+ for (int i = 0; i < stat.length; i++) {
+ tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
+ stat[i].getPath().toString(),
+ stat[i].getLen(),
+ stat[i].isDir(),
+ stat[i].getReplication(),
+ stat[i].getBlockSize(),
+ stat[i].getModificationTime(),
+ stat[i].getPermission().toString(),
+ stat[i].getOwner(),
+ stat[i].getGroup());
+ value.add(tmp);
+ }
+ return value;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Sets the permission of a pathname
+ */
+ public void chmod(Pathname path, short mode) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("chmod: " + path +
+ " mode " + mode);
+ fs.setPermission(new Path(path.pathname), new FsPermission(mode));
+ HadoopThriftHandler.LOG.debug("chmod done: " + path);
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Sets the owner & group of a pathname
+ */
+ public void chown(Pathname path, String owner, String group)
+ throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("chown: " + path +
+ " owner: " + owner +
+ " group: " + group);
+ fs.setOwner(new Path(path.pathname), owner, group);
+ HadoopThriftHandler.LOG.debug("chown done: " + path);
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Sets the replication factor of a file
+ */
+ public void setReplication(Pathname path, short repl) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("setrepl: " + path +
+ " replication factor: " + repl);
+ fs.setReplication(new Path(path.pathname), repl);
+ HadoopThriftHandler.LOG.debug("setrepl done: " + path);
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+
+ }
+
+ /**
+ * Returns the block locations of this file
+ */
+ public List<org.apache.hadoop.thriftfs.api.BlockLocation>
+ getFileBlockLocations(Pathname path, long start, long length)
+ throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);
+
+ org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
+ new Path(path.pathname));
+
+ org.apache.hadoop.fs.BlockLocation[] stat =
+ fs.getFileBlockLocations(status, start, length);
+ HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);
+
+ org.apache.hadoop.thriftfs.api.BlockLocation tmp;
+ List<org.apache.hadoop.thriftfs.api.BlockLocation> value =
+ new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();
+
+ for (int i = 0; i < stat.length; i++) {
+
+ // construct the list of hostnames from the array returned
+ // by HDFS
+ List<String> hosts = new LinkedList<String>();
+ String[] hostsHdfs = stat[i].getHosts();
+ for (int j = 0; j < hostsHdfs.length; j++) {
+ hosts.add(hostsHdfs[j]);
+ }
+
+ // construct the list of host:port from the array returned
+ // by HDFS
+ List<String> names = new LinkedList<String>();
+ String[] namesHdfs = stat[i].getNames();
+ for (int j = 0; j < namesHdfs.length; j++) {
+ names.add(namesHdfs[j]);
+ }
+ tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
+ hosts, names, stat[i].getOffset(), stat[i].getLength());
+ value.add(tmp);
+ }
+ return value;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+ }
+
+ // Bind to port. If the specified port is 0, then bind to random port.
+ private ServerSocket createServerSocket(int port) throws IOException {
+ try {
+ ServerSocket sock = new ServerSocket();
+ // Prevent 2MSL delay problem on server restarts
+ sock.setReuseAddress(true);
+ // Bind to listening port
+ if (port == 0) {
+ sock.bind(null);
+ serverPort = sock.getLocalPort();
+ } else {
+ sock.bind(new InetSocketAddress(port));
+ }
+ return sock;
+ } catch (IOException ioe) {
+ throw new IOException("Could not create ServerSocket on port " + port + "." +
+ ioe);
+ }
+ }
+
+ /**
+ * Constrcts a server object
+ */
+ public HadoopThriftServer(String [] args) {
+
+ if (args.length > 0) {
+ serverPort = new Integer(args[0]);
+ }
+ try {
+ ServerSocket ssock = createServerSocket(serverPort);
+ TServerTransport serverTransport = new TServerSocket(ssock);
+ Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
+ ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
+ TThreadPoolServer.Options options = new TThreadPoolServer.Options();
+ options.minWorkerThreads = 10;
+ server = new TThreadPoolServer(processor, serverTransport,
+ new TTransportFactory(),
+ new TTransportFactory(),
+ new TBinaryProtocol.Factory(),
+ new TBinaryProtocol.Factory(),
+ options);
+ System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
+ HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
+ System.out.flush();
+
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ }
+
+ public static void main(String [] args) {
+ HadoopThriftServer me = new HadoopThriftServer(args);
+ me.server.serve();
+ }
+};
+
Added: hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java?rev=690096&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java (added)
+++ hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java Thu Aug 28 21:31:57 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.thriftfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+
+/**
+ * This class is supposed to test ThriftHadoopFileSystem but has a long long
+ * way to go.
+ */
+public class TestThriftfs extends TestCase
+{
+ final static int numDatanodes = 1;
+
+ public TestThriftfs() throws IOException
+ {
+ }
+
+ public void testServer() throws IOException
+ {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
+ cluster.waitActive();
+ DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+ HadoopThriftServer server = new HadoopThriftServer();
+ server.close();
+ }
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestThriftfs().testServer();
+ }
+
+}