You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2010/11/29 03:46:41 UTC
svn commit: r1040005 - in
/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode:
FSImageCompression.java FSImageFormat.java FSImageSerialization.java
Author: eli
Date: Mon Nov 29 02:46:41 2010
New Revision: 1040005
URL: http://svn.apache.org/viewvc?rev=1040005&view=rev
Log:
Add missing files for HDFS-1473.
Added:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java?rev=1040005&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java Mon Nov 29 02:46:41 2010
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Simple container class that handles support for compressed fsimage files.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class FSImageCompression {
+
+ /** Codec to use to save or load image, or null if the image is not compressed */
+ private CompressionCodec imageCodec;
+
+ /**
+ * Create a "noop" compression - i.e. uncompressed
+ */
+ private FSImageCompression() {
+ }
+
+ /**
+ * Create compression using a particular codec
+ */
+ private FSImageCompression(CompressionCodec codec) {
+ imageCodec = codec;
+ }
+
+ /**
+ * Create a "noop" compression - i.e. uncompressed
+ */
+ public static FSImageCompression createNoopCompression() {
+ return new FSImageCompression();
+ }
+
+ /**
+ * Create a compression instance based on the user's configuration in the given
+ * Configuration object.
+ * @throws IOException if the specified codec is not available.
+ */
+ public static FSImageCompression createCompression(Configuration conf)
+ throws IOException {
+ boolean compressImage = conf.getBoolean(
+ DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY,
+ DFSConfigKeys.DFS_IMAGE_COMPRESS_DEFAULT);
+
+ if (!compressImage) {
+ return createNoopCompression();
+ }
+
+ String codecClassName = conf.get(
+ DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+ DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_DEFAULT);
+ return createCompression(conf, codecClassName);
+ }
+
+ /**
+ * Create a compression instance using the codec specified by
+ * <code>codecClassName</code>
+ */
+ private static FSImageCompression createCompression(Configuration conf,
+ String codecClassName)
+ throws IOException {
+
+ CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+ CompressionCodec codec = factory.getCodecByClassName(codecClassName);
+ if (codec == null) {
+ throw new IOException("Not a supported codec: " + codecClassName);
+ }
+
+ return new FSImageCompression(codec);
+ }
+
+ /**
+ * Create a compression instance based on a header read from an input stream.
+ * @throws IOException if the specified codec is not available or the
+ * underlying IO fails.
+ */
+ public static FSImageCompression readCompressionHeader(
+ Configuration conf,
+ DataInputStream dis) throws IOException
+ {
+ boolean isCompressed = dis.readBoolean();
+
+ if (!isCompressed) {
+ return createNoopCompression();
+ } else {
+ String codecClassName = Text.readString(dis);
+ return createCompression(conf, codecClassName);
+ }
+ }
+
+ /**
+ * Unwrap a compressed input stream by wrapping it with a decompressor based
+ * on this codec. If this instance represents no compression, simply adds
+ * buffering to the input stream.
+ * @return a buffered stream that provides uncompressed data
+ * @throws IOException If the decompressor cannot be instantiated or an IO
+ * error occurs.
+ */
+ public DataInputStream unwrapInputStream(InputStream is) throws IOException {
+ if (imageCodec != null) {
+ return new DataInputStream(imageCodec.createInputStream(is));
+ } else {
+ return new DataInputStream(new BufferedInputStream(is));
+ }
+ }
+
+ /**
+ * Write out a header to the given stream that indicates the chosen
+ * compression codec, and return the same stream wrapped with that codec.
+ * If no codec is specified, simply adds buffering to the stream, so that
+ * the returned stream is always buffered.
+ *
+ * @param os The stream to write header to and wrap. This stream should
+ * be unbuffered.
+ * @return A stream wrapped with the specified compressor, or buffering
+ * if compression is not enabled.
+ * @throws IOException if an IO error occurs or the compressor cannot be
+ * instantiated
+ */
+ public DataOutputStream writeHeaderAndWrapStream(OutputStream os)
+ throws IOException {
+ DataOutputStream dos = new DataOutputStream(os);
+
+ dos.writeBoolean(imageCodec != null);
+
+ if (imageCodec != null) {
+ String codecClassName = imageCodec.getClass().getCanonicalName();
+ Text.writeString(dos, codecClassName);
+
+ return new DataOutputStream(imageCodec.createOutputStream(os));
+ } else {
+ // use a buffered output stream
+ return new DataOutputStream(new BufferedOutputStream(os));
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (imageCodec != null) {
+ return "codec " + imageCodec.getClass().getCanonicalName();
+ } else {
+ return "no compression";
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1040005&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Mon Nov 29 02:46:41 2010
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Contains inner classes for reading or writing the on-disk format for FSImages
+ */
+public abstract class FSImageFormat {
+ private static final Log LOG = FSImage.LOG;
+
+ /**
+ * A one-shot class responsible for loading an image. The load() function
+ * should be called once, after which the getter methods may be used to retrieve
+ * information about the image that was loaded, if loading was successful.
+ */
+ public static class Loader {
+ private final Configuration conf;
+
+ /** Set to true once a file has been loaded using this loader. */
+ private boolean loaded = false;
+
+ /** The image version of the loaded file */
+ private int imgVersion;
+ /** The namespace ID of the loaded file */
+ private int imgNamespaceID;
+ /** The MD5 sum of the loaded file */
+ private MD5Hash imgDigest;
+
+ public Loader(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Return the version number of the image that has been loaded.
+ * @throws IllegalStateException if load() has not yet been called.
+ */
+ int getLoadedImageVersion() {
+ checkLoaded();
+ return imgVersion;
+ }
+
+ /**
+ * Return the MD5 checksum of the image that has been loaded.
+ * @throws IllegalStateException if load() has not yet been called.
+ */
+ MD5Hash getLoadedImageMd5() {
+ checkLoaded();
+ return imgDigest;
+ }
+
+ /**
+ * Return the namespace ID of the image that has been loaded.
+ * @throws IllegalStateException if load() has not yet been called.
+ */
+ int getLoadedNamespaceID() {
+ checkLoaded();
+ return imgNamespaceID;
+ }
+
+ /**
+ * Throw IllegalStateException if load() has not yet been called.
+ */
+ private void checkLoaded() {
+ if (!loaded) {
+ throw new IllegalStateException("Image not yet loaded!");
+ }
+ }
+
+ /**
+ * Throw IllegalStateException if load() has already been called.
+ */
+ private void checkNotLoaded() {
+ if (loaded) {
+ throw new IllegalStateException("Image already loaded!");
+ }
+ }
+
+ void load(File curFile, FSNamesystem targetNamesystem)
+ throws IOException
+ {
+ checkNotLoaded();
+ assert curFile != null : "curFile is null";
+
+ long startTime = now();
+ FSDirectory fsDir = targetNamesystem.dir;
+
+ //
+ // Load in bits
+ //
+ MessageDigest digester = MD5Hash.getDigester();
+ DigestInputStream fin = new DigestInputStream(
+ new FileInputStream(curFile), digester);
+
+ DataInputStream in = new DataInputStream(fin);
+ try {
+ /*
+ * Note: Remove any checks for version earlier than
+ * Storage.LAST_UPGRADABLE_LAYOUT_VERSION since we should never get
+ * to here with older images.
+ */
+
+ /*
+ * TODO we need to change format of the image file
+ * it should not contain version and namespace fields
+ */
+ // read image version: first appeared in version -1
+ imgVersion = in.readInt();
+
+ // read namespaceID: first appeared in version -2
+ imgNamespaceID = in.readInt();
+
+ // read number of files
+ long numFiles = readNumFiles(in);
+
+ // read in the last generation stamp.
+ if (imgVersion <= -12) {
+ long genstamp = in.readLong();
+ targetNamesystem.setGenerationStamp(genstamp);
+ }
+
+ // read compression related info
+ FSImageCompression compression;
+ if (imgVersion <= -25) { // -25: 1st version providing compression option
+ compression = FSImageCompression.readCompressionHeader(conf, in);
+ } else {
+ compression = FSImageCompression.createNoopCompression();
+ }
+ in = compression.unwrapInputStream(fin);
+
+ LOG.info("Loading image file " + curFile + " using " + compression);
+
+
+ // read file info
+ short replication = targetNamesystem.getDefaultReplication();
+
+ LOG.info("Number of files = " + numFiles);
+
+ byte[][] pathComponents;
+ byte[][] parentPath = {{}};
+ INodeDirectory parentINode = fsDir.rootDir;
+ for (long i = 0; i < numFiles; i++) {
+ long modificationTime = 0;
+ long atime = 0;
+ long blockSize = 0;
+ pathComponents = FSImageSerialization.readPathComponents(in);
+ replication = in.readShort();
+ replication = targetNamesystem.adjustReplication(replication);
+ modificationTime = in.readLong();
+ if (imgVersion <= -17) {
+ atime = in.readLong();
+ }
+ if (imgVersion <= -8) {
+ blockSize = in.readLong();
+ }
+ int numBlocks = in.readInt();
+ Block blocks[] = null;
+
+ // for older versions, a blocklist of size 0
+ // indicates a directory.
+ if ((-9 <= imgVersion && numBlocks > 0) ||
+ (imgVersion < -9 && numBlocks >= 0)) {
+ blocks = new Block[numBlocks];
+ for (int j = 0; j < numBlocks; j++) {
+ blocks[j] = new Block();
+ if (-14 < imgVersion) {
+ blocks[j].set(in.readLong(), in.readLong(),
+ GenerationStamp.GRANDFATHER_GENERATION_STAMP);
+ } else {
+ blocks[j].readFields(in);
+ }
+ }
+ }
+ // Older versions of HDFS does not store the block size in inode.
+ // If the file has more than one block, use the size of the
+ // first block as the blocksize. Otherwise use the default block size.
+ //
+ if (-8 <= imgVersion && blockSize == 0) {
+ if (numBlocks > 1) {
+ blockSize = blocks[0].getNumBytes();
+ } else {
+ long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0);
+ blockSize = Math.max(targetNamesystem.getDefaultBlockSize(), first);
+ }
+ }
+
+ // get quota only when the node is a directory
+ long nsQuota = -1L;
+ if (imgVersion <= -16 && blocks == null && numBlocks == -1) {
+ nsQuota = in.readLong();
+ }
+ long dsQuota = -1L;
+ if (imgVersion <= -18 && blocks == null && numBlocks == -1) {
+ dsQuota = in.readLong();
+ }
+
+ // Read the symlink only when the node is a symlink
+ String symlink = "";
+ if (imgVersion <= -23 && numBlocks == -2) {
+ symlink = Text.readString(in);
+ }
+
+ PermissionStatus permissions = targetNamesystem.getUpgradePermission();
+ if (imgVersion <= -11) {
+ permissions = PermissionStatus.read(in);
+ }
+
+ if (isRoot(pathComponents)) { // it is the root
+ // update the root's attributes
+ if (nsQuota != -1 || dsQuota != -1) {
+ fsDir.rootDir.setQuota(nsQuota, dsQuota);
+ }
+ fsDir.rootDir.setModificationTime(modificationTime);
+ fsDir.rootDir.setPermissionStatus(permissions);
+ continue;
+ }
+ // check if the new inode belongs to the same parent
+ if(!isParent(pathComponents, parentPath)) {
+ parentINode = null;
+ parentPath = getParent(pathComponents);
+ }
+ // add new inode
+ // without propagating modification time to parent
+ parentINode = fsDir.addToParent(pathComponents, parentINode, permissions,
+ blocks, symlink, replication, modificationTime,
+ atime, nsQuota, dsQuota, blockSize, false);
+ }
+
+ // load datanode info
+ this.loadDatanodes(in);
+
+ // load Files Under Construction
+ this.loadFilesUnderConstruction(in, targetNamesystem);
+
+ this.loadSecretManagerState(in, targetNamesystem);
+
+ } finally {
+ in.close();
+ }
+
+ imgDigest = new MD5Hash(digester.digest());
+ loaded = true;
+
+ LOG.info("Image file of size " + curFile.length() + " loaded in "
+ + (now() - startTime)/1000 + " seconds.");
+ }
+
+
+ private void loadDatanodes(DataInputStream in) throws IOException {
+ if (imgVersion > -3) // pre datanode image version
+ return;
+ if (imgVersion <= -12) {
+ return; // new versions do not store the datanodes any more.
+ }
+ int size = in.readInt();
+ for(int i = 0; i < size; i++) {
+ // We don't need to add these descriptors any more.
+ FSImageSerialization.DatanodeImage.skipOne(in);
+ }
+ }
+
+ private void loadFilesUnderConstruction(DataInputStream in,
+ FSNamesystem fs) throws IOException {
+ FSDirectory fsDir = fs.dir;
+ if (imgVersion > -13) // pre lease image version
+ return;
+ int size = in.readInt();
+
+ LOG.info("Number of files under construction = " + size);
+
+ for (int i = 0; i < size; i++) {
+ INodeFileUnderConstruction cons =
+ FSImageSerialization.readINodeUnderConstruction(in);
+
+ // verify that file exists in namespace
+ String path = cons.getLocalName();
+ INode old = fsDir.getFileINode(path);
+ if (old == null) {
+ throw new IOException("Found lease for non-existent file " + path);
+ }
+ if (old.isDirectory()) {
+ throw new IOException("Found lease for directory " + path);
+ }
+ INodeFile oldnode = (INodeFile) old;
+ fsDir.replaceNode(path, oldnode, cons);
+ fs.leaseManager.addLease(cons.getClientName(), path);
+ }
+ }
+
+ private void loadSecretManagerState(DataInputStream in,
+ FSNamesystem fs) throws IOException {
+ if (imgVersion > -23) {
+ //SecretManagerState is not available.
+ //This must not happen if security is turned on.
+ return;
+ }
+ fs.loadSecretManagerState(in);
+ }
+
+
+ private long readNumFiles(DataInputStream in) throws IOException {
+ if (imgVersion <= -16) {
+ return in.readLong();
+ } else {
+ return in.readInt();
+ }
+ }
+
+ private boolean isRoot(byte[][] path) {
+ return path.length == 1 &&
+ path[0] == null;
+ }
+
+ private boolean isParent(byte[][] path, byte[][] parent) {
+ if (path == null || parent == null)
+ return false;
+ if (parent.length == 0 || path.length != parent.length + 1)
+ return false;
+ boolean isParent = true;
+ for (int i = 0; i < parent.length; i++) {
+ isParent = isParent && Arrays.equals(path[i], parent[i]);
+ }
+ return isParent;
+ }
+
+ /**
+ * Return string representing the parent of the given path.
+ */
+ String getParent(String path) {
+ return path.substring(0, path.lastIndexOf(Path.SEPARATOR));
+ }
+
+ byte[][] getParent(byte[][] path) {
+ byte[][] result = new byte[path.length - 1][];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = new byte[path[i].length];
+ System.arraycopy(path[i], 0, result[i], 0, path[i].length);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * A one-shot class responsible for writing an image file.
+ * The write() function should be called once, after which the getter
+ * functions may be used to retrieve information about the file that was written.
+ */
+ static class Writer {
+ /** Set to true once an image has been written */
+ private boolean written = false;
+
+ /** The MD5 checksum of the file that was written */
+ private MD5Hash writtenDigest;
+
+ static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
+
+ /** @throws IllegalStateException if the instance has not yet written an image */
+ private void checkWritten() {
+ if (!written) {
+ throw new IllegalStateException("FSImageWriter has not written an image");
+ }
+ }
+
+ /** @throws IllegalStateException if the instance has already written an image */
+ private void checkNotWritten() {
+ if (written) {
+ throw new IllegalStateException("FSImageWriter has already written an image");
+ }
+ }
+
+ /**
+ * Return the MD5 checksum of the image file that was saved.
+ */
+ MD5Hash getWrittenDigest() {
+ checkWritten();
+ return writtenDigest;
+ }
+
+ void write(File newFile,
+ FSNamesystem sourceNamesystem,
+ FSImageCompression compression)
+ throws IOException {
+ checkNotWritten();
+
+ FSDirectory fsDir = sourceNamesystem.dir;
+ long startTime = now();
+ //
+ // Write out data
+ //
+ MessageDigest digester = MD5Hash.getDigester();
+ FileOutputStream fout = new FileOutputStream(newFile);
+ DigestOutputStream fos = new DigestOutputStream(fout, digester);
+ DataOutputStream out = new DataOutputStream(fos);
+ try {
+ out.writeInt(FSConstants.LAYOUT_VERSION);
+ out.writeInt(sourceNamesystem.getFSImage().getNamespaceID()); // TODO bad dependency
+ out.writeLong(fsDir.rootDir.numItemsInTree());
+ out.writeLong(sourceNamesystem.getGenerationStamp());
+
+ // write compression info and set up compressed stream
+ out = compression.writeHeaderAndWrapStream(fos);
+ LOG.info("Saving image file " + newFile +
+ " using " + compression);
+
+
+ byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
+ ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
+ // save the root
+ FSImageSerialization.saveINode2Image(strbuf, fsDir.rootDir, out);
+ // save the rest of the nodes
+ saveImage(strbuf, 0, fsDir.rootDir, out);
+ sourceNamesystem.saveFilesUnderConstruction(out);
+ sourceNamesystem.saveSecretManagerState(out);
+ strbuf = null;
+
+ out.flush();
+ fout.getChannel().force(true);
+ } finally {
+ out.close();
+ }
+
+ written = true;
+ // set md5 of the saved image
+ writtenDigest = new MD5Hash(digester.digest());
+
+ LOG.info("Image file of size " + newFile.length() + " saved in "
+ + (now() - startTime)/1000 + " seconds.");
+ }
+
+ /**
+ * Save file tree image starting from the given root.
+ * This is a recursive procedure, which first saves all children of
+ * a current directory and then moves inside the sub-directories.
+ */
+ private static void saveImage(ByteBuffer parentPrefix,
+ int prefixLength,
+ INodeDirectory current,
+ DataOutputStream out) throws IOException {
+ int newPrefixLength = prefixLength;
+ if (current.getChildrenRaw() == null)
+ return;
+ for(INode child : current.getChildren()) {
+ // print all children first
+ parentPrefix.position(prefixLength);
+ parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+ FSImageSerialization.saveINode2Image(parentPrefix, child, out);
+ }
+ for(INode child : current.getChildren()) {
+ if(!child.isDirectory())
+ continue;
+ parentPrefix.position(prefixLength);
+ parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+ newPrefixLength = parentPrefix.position();
+ saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
+ }
+ parentPrefix.position(prefixLength);
+ }
+ }
+}
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1040005&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Mon Nov 29 02:46:41 2010
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DeprecatedUTF8;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Static utility functions for serializing various pieces of data in the correct
+ * format for the FSImage file.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class FSImageSerialization {
+
+ /**
+ * In order to reduce allocation, we reuse some static objects. However, the methods
+ * in this class should be thread-safe since image-saving is multithreaded, so
+ * we need to keep the static objects in a thread-local.
+ */
+ static private final ThreadLocal<TLData> TL_DATA =
+ new ThreadLocal<TLData>() {
+ @Override
+ protected TLData initialValue() {
+ return new TLData();
+ }
+ };
+
+ /**
+ * Simple container "struct" for threadlocal data.
+ */
+ static private final class TLData {
+ final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
+ final FsPermission FILE_PERM = new FsPermission((short) 0);
+ }
+
+ // Helper function that reads in an INodeUnderConstruction
+ // from the input stream
+ //
+ static INodeFileUnderConstruction readINodeUnderConstruction(
+ DataInputStream in) throws IOException {
+ byte[] name = readBytes(in);
+ short blockReplication = in.readShort();
+ long modificationTime = in.readLong();
+ long preferredBlockSize = in.readLong();
+ int numBlocks = in.readInt();
+ BlockInfo[] blocks = new BlockInfo[numBlocks];
+ Block blk = new Block();
+ int i = 0;
+ for (; i < numBlocks-1; i++) {
+ blk.readFields(in);
+ blocks[i] = new BlockInfo(blk, blockReplication);
+ }
+ // last block is UNDER_CONSTRUCTION
+ if(numBlocks > 0) {
+ blk.readFields(in);
+ blocks[i] = new BlockInfoUnderConstruction(
+ blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+ }
+ PermissionStatus perm = PermissionStatus.read(in);
+ String clientName = readString(in);
+ String clientMachine = readString(in);
+
+ // These locations are not used at all
+ int numLocs = in.readInt();
+ DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
+ for (i = 0; i < numLocs; i++) {
+ locations[i] = new DatanodeDescriptor();
+ locations[i].readFields(in);
+ }
+
+ return new INodeFileUnderConstruction(name,
+ blockReplication,
+ modificationTime,
+ preferredBlockSize,
+ blocks,
+ perm,
+ clientName,
+ clientMachine,
+ null);
+ }
+
+ // Helper function that writes an INodeUnderConstruction
+ // into the input stream
+ //
+ static void writeINodeUnderConstruction(DataOutputStream out,
+ INodeFileUnderConstruction cons,
+ String path)
+ throws IOException {
+ writeString(path, out);
+ out.writeShort(cons.getReplication());
+ out.writeLong(cons.getModificationTime());
+ out.writeLong(cons.getPreferredBlockSize());
+ int nrBlocks = cons.getBlocks().length;
+ out.writeInt(nrBlocks);
+ for (int i = 0; i < nrBlocks; i++) {
+ cons.getBlocks()[i].write(out);
+ }
+ cons.getPermissionStatus().write(out);
+ writeString(cons.getClientName(), out);
+ writeString(cons.getClientMachine(), out);
+
+ out.writeInt(0); // do not store locations of last block
+ }
+
+ /*
+ * Save one inode's attributes to the image.
+ */
+ static void saveINode2Image(ByteBuffer name,
+ INode node,
+ DataOutputStream out) throws IOException {
+ int nameLen = name.position();
+ out.writeShort(nameLen);
+ out.write(name.array(), name.arrayOffset(), nameLen);
+ FsPermission filePerm = TL_DATA.get().FILE_PERM;
+ if (node.isDirectory()) {
+ out.writeShort(0); // replication
+ out.writeLong(node.getModificationTime());
+ out.writeLong(0); // access time
+ out.writeLong(0); // preferred block size
+ out.writeInt(-1); // # of blocks
+ out.writeLong(node.getNsQuota());
+ out.writeLong(node.getDsQuota());
+ filePerm.fromShort(node.getFsPermissionShort());
+ PermissionStatus.write(out, node.getUserName(),
+ node.getGroupName(),
+ filePerm);
+ } else if (node.isLink()) {
+ out.writeShort(0); // replication
+ out.writeLong(0); // modification time
+ out.writeLong(0); // access time
+ out.writeLong(0); // preferred block size
+ out.writeInt(-2); // # of blocks
+ Text.writeString(out, ((INodeSymlink)node).getLinkValue());
+ filePerm.fromShort(node.getFsPermissionShort());
+ PermissionStatus.write(out, node.getUserName(),
+ node.getGroupName(),
+ filePerm);
+ } else {
+ INodeFile fileINode = (INodeFile)node;
+ out.writeShort(fileINode.getReplication());
+ out.writeLong(fileINode.getModificationTime());
+ out.writeLong(fileINode.getAccessTime());
+ out.writeLong(fileINode.getPreferredBlockSize());
+ Block[] blocks = fileINode.getBlocks();
+ out.writeInt(blocks.length);
+ for (Block blk : blocks)
+ blk.write(out);
+ filePerm.fromShort(fileINode.getFsPermissionShort());
+ PermissionStatus.write(out, fileINode.getUserName(),
+ fileINode.getGroupName(),
+ filePerm);
+ }
+ }
+
+ // This should be reverted to package private once the ImageLoader
+ // code is moved into this package. This method should not be called
+ // by other code.
+ @SuppressWarnings("deprecation")
+ public static String readString(DataInputStream in) throws IOException {
+ DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
+ ustr.readFields(in);
+ return ustr.toString();
+ }
+
+ static String readString_EmptyAsNull(DataInputStream in) throws IOException {
+ final String s = readString(in);
+ return s.isEmpty()? null: s;
+ }
+
+ @SuppressWarnings("deprecation")
+ static void writeString(String str, DataOutputStream out) throws IOException {
+ DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
+ ustr.set(str);
+ ustr.write(out);
+ }
+
+
+ // Same comments apply for this method as for readString()
+ @SuppressWarnings("deprecation")
+ public static byte[] readBytes(DataInputStream in) throws IOException {
+ DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
+ ustr.readFields(in);
+ int len = ustr.getLength();
+ byte[] bytes = new byte[len];
+ System.arraycopy(ustr.getBytes(), 0, bytes, 0, len);
+ return bytes;
+ }
+
+ /**
+ * Reading the path from the image and converting it to byte[][] directly
+ * this saves us an array copy and conversions to and from String
+ * @param in
+ * @return the array each element of which is a byte[] representation
+ * of a path component
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ public static byte[][] readPathComponents(DataInputStream in)
+ throws IOException {
+ DeprecatedUTF8 ustr = TL_DATA.get().U_STR;
+
+ ustr.readFields(in);
+ return DFSUtil.bytes2byteArray(ustr.getBytes(),
+ ustr.getLength(), (byte) Path.SEPARATOR_CHAR);
+ }
+
+ /**
+ * DatanodeImage is used to store persistent information
+ * about datanodes into the fsImage.
+ */
+ static class DatanodeImage implements Writable {
+ DatanodeDescriptor node = new DatanodeDescriptor();
+
+ static void skipOne(DataInput in) throws IOException {
+ DatanodeImage nodeImage = new DatanodeImage();
+ nodeImage.readFields(in);
+ }
+
+ /////////////////////////////////////////////////
+ // Writable
+ /////////////////////////////////////////////////
+ /**
+ * Public method that serializes the information about a
+ * Datanode to be stored in the fsImage.
+ */
+ public void write(DataOutput out) throws IOException {
+ new DatanodeID(node).write(out);
+ out.writeLong(node.getCapacity());
+ out.writeLong(node.getRemaining());
+ out.writeLong(node.getLastUpdate());
+ out.writeInt(node.getXceiverCount());
+ }
+
+ /**
+ * Public method that reads a serialized Datanode
+ * from the fsImage.
+ */
+ public void readFields(DataInput in) throws IOException {
+ DatanodeID id = new DatanodeID();
+ id.readFields(in);
+ long capacity = in.readLong();
+ long remaining = in.readLong();
+ long lastUpdate = in.readLong();
+ int xceiverCount = in.readInt();
+
+ // update the DatanodeDescriptor with the data we read in
+ node.updateRegInfo(id);
+ node.setStorageID(id.getStorageID());
+ node.setCapacity(capacity);
+ node.setRemaining(remaining);
+ node.setLastUpdate(lastUpdate);
+ node.setXceiverCount(xceiverCount);
+ }
+ }
+}
\ No newline at end of file