You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC
svn commit: r1619012 [17/35] - in
/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Aug 19 23:49:39 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
@@ -69,6 +70,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -80,6 +82,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
@@ -424,7 +427,6 @@ public class FSEditLog implements LogsPu
/**
* Wait if an automatic sync is scheduled
- * @throws InterruptedException
*/
synchronized void waitIfAutoSyncScheduled() {
try {
@@ -698,12 +700,19 @@ public class FSEditLog implements LogsPu
.setBlocks(newNode.getBlocks())
.setPermissionStatus(permissions)
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
- .setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine());
+ .setClientMachine(
+ newNode.getFileUnderConstructionFeature().getClientMachine());
AclFeature f = newNode.getAclFeature();
if (f != null) {
op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
}
+
+ XAttrFeature x = newNode.getXAttrFeature();
+ if (x != null) {
+ op.setXAttrs(x.getXAttrs());
+ }
+
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@@ -759,6 +768,11 @@ public class FSEditLog implements LogsPu
if (f != null) {
op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode));
}
+
+ XAttrFeature x = newNode.getXAttrFeature();
+ if (x != null) {
+ op.setXAttrs(x.getXAttrs());
+ }
logEdit(op);
}
@@ -802,7 +816,8 @@ public class FSEditLog implements LogsPu
/** Add set namespace quota record to edit log
*
* @param src the string representation of the path to a directory
- * @param quota the directory size limit
+ * @param nsQuota namespace quota
+ * @param dsQuota diskspace quota
*/
void logSetQuota(String src, long nsQuota, long dsQuota) {
SetQuotaOp op = SetQuotaOp.getInstance(cache.get())
@@ -1050,6 +1065,22 @@ public class FSEditLog implements LogsPu
op.aclEntries = entries;
logEdit(op);
}
+
+ void logSetXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
+ final SetXAttrOp op = SetXAttrOp.getInstance();
+ op.src = src;
+ op.xAttrs = xAttrs;
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
+ void logRemoveXAttrs(String src, List<XAttr> xAttrs, boolean toLogRpcIds) {
+ final RemoveXAttrOp op = RemoveXAttrOp.getInstance();
+ op.src = src;
+ op.xAttrs = xAttrs;
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
/**
* Get all the journals this edit log is currently operating on.
@@ -1182,7 +1213,7 @@ public class FSEditLog implements LogsPu
* Finalize the current log segment.
* Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
*/
- synchronized void endCurrentLogSegment(boolean writeEndTxn) {
+ public synchronized void endCurrentLogSegment(boolean writeEndTxn) {
LOG.info("Ending log segment " + curSegmentTxId);
Preconditions.checkState(isSegmentOpen(),
"Bad state: %s", state);
@@ -1452,8 +1483,9 @@ public class FSEditLog implements LogsPu
* Select a list of input streams.
*
* @param fromTxId first transaction in the selected streams
- * @param toAtLeast the selected streams must contain this transaction
- * @param inProgessOk set to true if in-progress streams are OK
+ * @param toAtLeastTxId the selected streams must contain this transaction
+ * @param recovery recovery context
+ * @param inProgressOk set to true if in-progress streams are OK
*/
public synchronized Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Aug 19 23:49:39 2014
@@ -25,12 +25,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.EnumMap;
+import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -76,6 +78,8 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
@@ -351,6 +355,7 @@ public class FSEditLogLoader {
lastInodeId);
newFile = fsDir.unprotectedAddFile(inodeId,
path, addCloseOp.permissions, addCloseOp.aclEntries,
+ addCloseOp.xAttrs,
replication, addCloseOp.mtime, addCloseOp.atime,
addCloseOp.blockSize, true, addCloseOp.clientName,
addCloseOp.clientMachine);
@@ -371,8 +376,7 @@ public class FSEditLogLoader {
"for append");
}
LocatedBlock lb = fsNamesys.prepareFileForWrite(path,
- oldFile, addCloseOp.clientName, addCloseOp.clientMachine, null,
- false, iip.getLatestSnapshotId(), false);
+ oldFile, addCloseOp.clientName, addCloseOp.clientMachine, false, iip.getLatestSnapshotId(), false);
newFile = INodeFile.valueOf(fsDir.getINode(path),
path, true);
@@ -735,7 +739,13 @@ public class FSEditLogLoader {
}
case OP_ROLLING_UPGRADE_FINALIZE: {
final long finalizeTime = ((RollingUpgradeOp) op).getTime();
- fsNamesys.finalizeRollingUpgradeInternal(finalizeTime);
+ if (fsNamesys.isRollingUpgrade()) {
+ // Only do it when NN is actually doing rolling upgrade.
+ // We can get FINALIZE without corresponding START, if NN is restarted
+ // before this op is consumed and a new checkpoint is created.
+ fsNamesys.finalizeRollingUpgradeInternal(finalizeTime);
+ }
+ fsNamesys.getFSImage().updateStorageVersion();
fsNamesys.getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
NameNodeFile.IMAGE);
break;
@@ -798,6 +808,25 @@ public class FSEditLogLoader {
fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries);
break;
}
+ case OP_SET_XATTR: {
+ SetXAttrOp setXAttrOp = (SetXAttrOp) op;
+ fsDir.unprotectedSetXAttrs(setXAttrOp.src, setXAttrOp.xAttrs,
+ EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE));
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(setXAttrOp.rpcClientId, setXAttrOp.rpcCallId);
+ }
+ break;
+ }
+ case OP_REMOVE_XATTR: {
+ RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
+ fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src,
+ removeXAttrOp.xAttrs);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId,
+ removeXAttrOp.rpcCallId);
+ }
+ break;
+ }
default:
throw new IOException("Invalid operation read " + op.opCode);
}
@@ -992,9 +1021,6 @@ public class FSEditLogLoader {
* If there are invalid or corrupt transactions in the middle of the stream,
* validateEditLog will skip over them.
* This reads through the stream but does not close it.
- *
- * @throws IOException if the stream cannot be read due to an IO error (eg
- * if the log does not exist)
*/
static EditLogValidation validateEditLog(EditLogInputStream in) {
long lastPos = 0;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Aug 19 23:49:39 2014
@@ -40,6 +40,7 @@ import static org.apache.hadoop.hdfs.ser
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_XATTR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -54,6 +55,7 @@ import static org.apache.hadoop.hdfs.ser
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_PERMISSIONS;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_QUOTA;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_REPLICATION;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_XATTR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_START_LOG_SEGMENT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SYMLINK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_TIMES;
@@ -79,6 +81,8 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
@@ -95,6 +99,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEditLogProto;
+import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.XAttrEditLogProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
@@ -111,7 +116,7 @@ import org.apache.hadoop.io.WritableFact
import org.apache.hadoop.ipc.ClientId;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.DataChecksum;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl;
@@ -186,6 +191,8 @@ public abstract class FSEditLogOp {
OP_ROLLING_UPGRADE_START, "start"));
inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp(
OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
+ inst.put(OP_SET_XATTR, new SetXAttrOp());
+ inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
}
public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -375,6 +382,16 @@ public abstract class FSEditLogOp {
}
}
+ private static List<XAttr> readXAttrsFromEditLog(DataInputStream in,
+ int logVersion) throws IOException {
+ if (!NameNodeLayoutVersion.supports(NameNodeLayoutVersion.Feature.XATTRS,
+ logVersion)) {
+ return null;
+ }
+ XAttrEditLogProto proto = XAttrEditLogProto.parseDelimitedFrom(in);
+ return PBHelper.convertXAttrs(proto.getXAttrsList());
+ }
+
@SuppressWarnings("unchecked")
static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
int length;
@@ -387,6 +404,7 @@ public abstract class FSEditLogOp {
Block[] blocks;
PermissionStatus permissions;
List<AclEntry> aclEntries;
+ List<XAttr> xAttrs;
String clientName;
String clientMachine;
@@ -454,6 +472,11 @@ public abstract class FSEditLogOp {
return (T)this;
}
+ <T extends AddCloseOp> T setXAttrs(List<XAttr> xAttrs) {
+ this.xAttrs = xAttrs;
+ return (T)this;
+ }
+
<T extends AddCloseOp> T setClientName(String clientName) {
this.clientName = clientName;
return (T)this;
@@ -477,6 +500,9 @@ public abstract class FSEditLogOp {
if (this.opCode == OP_ADD) {
AclEditLogUtil.write(aclEntries, out);
+ XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
+ b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
+ b.build().writeDelimitedTo(out);
FSImageSerialization.writeString(clientName,out);
FSImageSerialization.writeString(clientMachine,out);
// write clientId and callId
@@ -539,9 +565,9 @@ public abstract class FSEditLogOp {
this.blocks = readBlocks(in, logVersion);
this.permissions = PermissionStatus.read(in);
- // clientname, clientMachine and block locations of last block.
if (this.opCode == OP_ADD) {
aclEntries = AclEditLogUtil.read(in, logVersion);
+ this.xAttrs = readXAttrsFromEditLog(in, logVersion);
this.clientName = FSImageSerialization.readString(in);
this.clientMachine = FSImageSerialization.readString(in);
// read clientId and callId
@@ -666,8 +692,8 @@ public abstract class FSEditLogOp {
}
/**
- * {@literal @AtMostOnce} for {@link ClientProtocol#startFile} and
- * {@link ClientProtocol#appendFile}
+ * {@literal @AtMostOnce} for {@link ClientProtocol#create} and
+ * {@link ClientProtocol#append}
*/
static class AddOp extends AddCloseOp {
private AddOp() {
@@ -1336,6 +1362,7 @@ public abstract class FSEditLogOp {
long timestamp;
PermissionStatus permissions;
List<AclEntry> aclEntries;
+ List<XAttr> xAttrs;
private MkdirOp() {
super(OP_MKDIR);
@@ -1370,6 +1397,11 @@ public abstract class FSEditLogOp {
return this;
}
+ MkdirOp setXAttrs(List<XAttr> xAttrs) {
+ this.xAttrs = xAttrs;
+ return this;
+ }
+
@Override
public
void writeFields(DataOutputStream out) throws IOException {
@@ -1379,6 +1411,9 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeLong(timestamp, out); // atime, unused at this
permissions.write(out);
AclEditLogUtil.write(aclEntries, out);
+ XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
+ b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
+ b.build().writeDelimitedTo(out);
}
@Override
@@ -1423,6 +1458,8 @@ public abstract class FSEditLogOp {
this.permissions = PermissionStatus.read(in);
aclEntries = AclEditLogUtil.read(in, logVersion);
+
+ xAttrs = readXAttrsFromEditLog(in, logVersion);
}
@Override
@@ -1444,6 +1481,8 @@ public abstract class FSEditLogOp {
builder.append(opCode);
builder.append(", txid=");
builder.append(txid);
+ builder.append(", xAttrs=");
+ builder.append(xAttrs);
builder.append("]");
return builder.toString();
}
@@ -1461,6 +1500,9 @@ public abstract class FSEditLogOp {
if (aclEntries != null) {
appendAclEntriesToXml(contentHandler, aclEntries);
}
+ if (xAttrs != null) {
+ appendXAttrsToXml(contentHandler, xAttrs);
+ }
}
@Override void fromXml(Stanza st) throws InvalidXmlException {
@@ -1470,6 +1512,7 @@ public abstract class FSEditLogOp {
this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
this.permissions = permissionStatusFromXml(st);
aclEntries = readAclEntriesFromXml(st);
+ xAttrs = readXAttrsFromXml(st);
}
}
@@ -3490,6 +3533,100 @@ public abstract class FSEditLogOp {
return builder.toString();
}
}
+
+ static class RemoveXAttrOp extends FSEditLogOp {
+ List<XAttr> xAttrs;
+ String src;
+
+ private RemoveXAttrOp() {
+ super(OP_REMOVE_XATTR);
+ }
+
+ static RemoveXAttrOp getInstance() {
+ return new RemoveXAttrOp();
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
+ src = p.getSrc();
+ xAttrs = PBHelper.convertXAttrs(p.getXAttrsList());
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
+ if (src != null) {
+ b.setSrc(src);
+ }
+ b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
+ b.build().writeDelimitedTo(out);
+ // clientId and callId
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "SRC", src);
+ appendXAttrsToXml(contentHandler, xAttrs);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ src = st.getValue("SRC");
+ xAttrs = readXAttrsFromXml(st);
+ readRpcIdsFromXml(st);
+ }
+ }
+
+ static class SetXAttrOp extends FSEditLogOp {
+ List<XAttr> xAttrs;
+ String src;
+
+ private SetXAttrOp() {
+ super(OP_SET_XATTR);
+ }
+
+ static SetXAttrOp getInstance() {
+ return new SetXAttrOp();
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in);
+ src = p.getSrc();
+ xAttrs = PBHelper.convertXAttrs(p.getXAttrsList());
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder();
+ if (src != null) {
+ b.setSrc(src);
+ }
+ b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs));
+ b.build().writeDelimitedTo(out);
+ // clientId and callId
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "SRC", src);
+ appendXAttrsToXml(contentHandler, xAttrs);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ src = st.getValue("SRC");
+ xAttrs = readXAttrsFromXml(st);
+ readRpcIdsFromXml(st);
+ }
+ }
static class SetAclOp extends FSEditLogOp {
List<AclEntry> aclEntries = Lists.newArrayList();
@@ -3652,7 +3789,7 @@ public abstract class FSEditLogOp {
public Writer(DataOutputBuffer out) {
this.buf = out;
- this.checksum = new PureJavaCrc32();
+ this.checksum = DataChecksum.newCrc32();
}
/**
@@ -3703,7 +3840,7 @@ public abstract class FSEditLogOp {
this.logVersion = logVersion;
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) {
- this.checksum = new PureJavaCrc32();
+ this.checksum = DataChecksum.newCrc32();
} else {
this.checksum = null;
}
@@ -4106,4 +4243,48 @@ public abstract class FSEditLogOp {
}
return aclEntries;
}
+
+ private static void appendXAttrsToXml(ContentHandler contentHandler,
+ List<XAttr> xAttrs) throws SAXException {
+ for (XAttr xAttr: xAttrs) {
+ contentHandler.startElement("", "", "XATTR", new AttributesImpl());
+ XMLUtils.addSaxString(contentHandler, "NAMESPACE",
+ xAttr.getNameSpace().toString());
+ XMLUtils.addSaxString(contentHandler, "NAME", xAttr.getName());
+ if (xAttr.getValue() != null) {
+ try {
+ XMLUtils.addSaxString(contentHandler, "VALUE",
+ XAttrCodec.encodeValue(xAttr.getValue(), XAttrCodec.HEX));
+ } catch (IOException e) {
+ throw new SAXException(e);
+ }
+ }
+ contentHandler.endElement("", "", "XATTR");
+ }
+ }
+
+ private static List<XAttr> readXAttrsFromXml(Stanza st)
+ throws InvalidXmlException {
+ if (!st.hasChildren("XATTR")) {
+ return null;
+ }
+
+ List<Stanza> stanzas = st.getChildren("XATTR");
+ List<XAttr> xattrs = Lists.newArrayListWithCapacity(stanzas.size());
+ for (Stanza a: stanzas) {
+ XAttr.Builder builder = new XAttr.Builder();
+ builder.setNameSpace(XAttr.NameSpace.valueOf(a.getValue("NAMESPACE"))).
+ setName(a.getValue("NAME"));
+ String v = a.getValueOrNull("VALUE");
+ if (v != null) {
+ try {
+ builder.setValue(XAttrCodec.decodeValue(v));
+ } catch (IOException e) {
+ throw new InvalidXmlException(e.toString());
+ }
+ }
+ xattrs.add(builder.build());
+ }
+ return xattrs;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Tue Aug 19 23:49:39 2014
@@ -70,6 +70,8 @@ public enum FSEditLogOpCodes {
OP_SET_ACL ((byte) 40),
OP_ROLLING_UPGRADE_START ((byte) 41),
OP_ROLLING_UPGRADE_FINALIZE ((byte) 42),
+ OP_SET_XATTR ((byte) 43),
+ OP_REMOVE_XATTR ((byte) 44),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue Aug 19 23:49:39 2014
@@ -156,7 +156,7 @@ public class FSImage implements Closeabl
* directory to allow them to format anyway. Otherwise, returns
* false, unless 'force' is specified.
*
- * @param force format regardless of whether dirs exist
+ * @param force if true, format regardless of whether dirs exist
* @param interactive prompt the user when a dir exists
* @return true if formatting should proceed
* @throws IOException if some storage cannot be accessed
@@ -214,10 +214,18 @@ public class FSImage implements Closeabl
int layoutVersion = storage.getLayoutVersion();
+ if (startOpt == StartupOption.METADATAVERSION) {
+ System.out.println("HDFS Image Version: " + layoutVersion);
+ System.out.println("Software format version: " +
+ HdfsConstants.NAMENODE_LAYOUT_VERSION);
+ return false;
+ }
+
if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
}
if (startOpt != StartupOption.UPGRADE
+ && startOpt != StartupOption.UPGRADEONLY
&& !RollingUpgradeStartupOption.STARTED.matches(startOpt)
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
&& layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
@@ -227,7 +235,7 @@ public class FSImage implements Closeabl
+ HdfsConstants.NAMENODE_LAYOUT_VERSION + " is required.\n"
+ "Please restart NameNode with the \""
+ RollingUpgradeStartupOption.STARTED.getOptionString()
- + "\" option if a rolling upgraded is already started;"
+ + "\" option if a rolling upgrade is already started;"
+ " or restart NameNode with the \""
+ StartupOption.UPGRADE.getName() + "\" option to start"
+ " a new upgrade.");
@@ -256,6 +264,7 @@ public class FSImage implements Closeabl
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
+ case UPGRADEONLY:
doUpgrade(target);
return false; // upgrade saved image already
case IMPORT:
@@ -289,6 +298,12 @@ public class FSImage implements Closeabl
storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState;
+ if (startOpt == StartupOption.METADATAVERSION) {
+ /* All we need is the layout version. */
+ storage.readProperties(sd);
+ return true;
+ }
+
try {
curState = sd.analyzeStorage(startOpt, storage);
// sd is locked but not opened
@@ -495,7 +510,6 @@ public class FSImage implements Closeabl
FSImage realImage = target.getFSImage();
FSImage ckptImage = new FSImage(conf,
checkpointDirs, checkpointEditsDirs);
- target.dir.fsImage = ckptImage;
// load from the checkpoint dirs
try {
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
@@ -507,7 +521,6 @@ public class FSImage implements Closeabl
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
realImage.initEditLog(StartupOption.IMPORT);
- target.dir.fsImage = realImage;
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
// and save it but keep the same checkpointTime
@@ -542,7 +555,7 @@ public class FSImage implements Closeabl
}
@VisibleForTesting
- void setEditLogForTesting(FSEditLog newLog) {
+ public void setEditLogForTesting(FSEditLog newLog) {
editLog = newLog;
}
@@ -737,11 +750,13 @@ public class FSImage implements Closeabl
editLog.recoverUnclosedStreams();
} else if (HAUtil.isHAEnabled(conf, nameserviceId)
&& (startOpt == StartupOption.UPGRADE
+ || startOpt == StartupOption.UPGRADEONLY
|| RollingUpgradeStartupOption.ROLLBACK.matches(startOpt))) {
// This NN is HA, but we're doing an upgrade or a rollback of rolling
// upgrade so init the edit log for write.
editLog.initJournalsForWrite();
- if (startOpt == StartupOption.UPGRADE) {
+ if (startOpt == StartupOption.UPGRADE
+ || startOpt == StartupOption.UPGRADEONLY) {
long sharedLogCTime = editLog.getSharedLogCTime();
if (this.storage.getCTime() < sharedLogCTime) {
throw new IOException("It looks like the shared log is already " +
@@ -935,6 +950,25 @@ public class FSImage implements Closeabl
}
/**
+ * Save FSimage in the legacy format. This is not for NN consumption,
+ * but for tools like OIV.
+ */
+ public void saveLegacyOIVImage(FSNamesystem source, String targetDir,
+ Canceler canceler) throws IOException {
+ FSImageCompression compression =
+ FSImageCompression.createCompression(conf);
+ long txid = getLastAppliedOrWrittenTxId();
+ SaveNamespaceContext ctx = new SaveNamespaceContext(source, txid,
+ canceler);
+ FSImageFormat.Saver saver = new FSImageFormat.Saver(ctx);
+ String imageFileName = NNStorage.getLegacyOIVImageFileName(txid);
+ File imageFile = new File(targetDir, imageFileName);
+ saver.save(imageFile, compression);
+ archivalManager.purgeOldLegacyOIVImages(targetDir, txid);
+ }
+
+
+ /**
* FSImageSaver is being run in a separate thread when saving
* FSImage. There is one thread per each copy of the image.
*
@@ -992,6 +1026,13 @@ public class FSImage implements Closeabl
}
/**
+ * Update version of all storage directories.
+ */
+ public synchronized void updateStorageVersion() throws IOException {
+ storage.writeAll();
+ }
+
+ /**
* @see #saveNamespace(FSNamesystem, Canceler)
*/
public synchronized void saveNamespace(FSNamesystem source)
@@ -1002,7 +1043,6 @@ public class FSImage implements Closeabl
/**
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
- * @param canceler
*/
public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
Canceler canceler) throws IOException {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue Aug 19 23:49:39 2014
@@ -21,14 +21,20 @@ import static org.apache.hadoop.util.Tim
import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.security.DigestInputStream;
+import java.security.DigestOutputStream;
import java.security.MessageDigest;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -50,8 +56,8 @@ import org.apache.hadoop.hdfs.server.blo
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
@@ -60,17 +66,115 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
-import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
/**
- * This class loads and stores the FSImage of the NameNode. The file
- * src/main/proto/fsimage.proto describes the on-disk layout of the FSImage.
+ * Contains inner classes for reading or writing the on-disk format for
+ * FSImages.
+ *
+ * In particular, the format of the FSImage looks like:
+ * <pre>
+ * FSImage {
+ * layoutVersion: int, namespaceID: int, numberItemsInFSDirectoryTree: long,
+ * namesystemGenerationStampV1: long, namesystemGenerationStampV2: long,
+ * generationStampAtBlockIdSwitch:long, lastAllocatedBlockId:
+ * long transactionID: long, snapshotCounter: int, numberOfSnapshots: int,
+ * numOfSnapshottableDirs: int,
+ * {FSDirectoryTree, FilesUnderConstruction, SecretManagerState} (can be compressed)
+ * }
+ *
+ * FSDirectoryTree (if {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is supported) {
+ * INodeInfo of root, numberOfChildren of root: int
+ * [list of INodeInfo of root's children],
+ * [list of INodeDirectoryInfo of root's directory children]
+ * }
+ *
+ * FSDirectoryTree (if {@link Feature#FSIMAGE_NAME_OPTIMIZATION} not supported){
+ * [list of INodeInfo of INodes in topological order]
+ * }
+ *
+ * INodeInfo {
+ * {
+ * localName: short + byte[]
+ * } when {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is supported
+ * or
+ * {
+ * fullPath: byte[]
+ * } when {@link Feature#FSIMAGE_NAME_OPTIMIZATION} is not supported
+ * replicationFactor: short, modificationTime: long,
+ * accessTime: long, preferredBlockSize: long,
+ * numberOfBlocks: int (-1 for INodeDirectory, -2 for INodeSymLink),
+ * {
+ * nsQuota: long, dsQuota: long,
+ * {
+ * isINodeSnapshottable: byte,
+ * isINodeWithSnapshot: byte (if isINodeSnapshottable is false)
+ * } (when {@link Feature#SNAPSHOT} is supported),
+ * fsPermission: short, PermissionStatus
+ * } for INodeDirectory
+ * or
+ * {
+ * symlinkString, fsPermission: short, PermissionStatus
+ * } for INodeSymlink
+ * or
+ * {
+ * [list of BlockInfo]
+ * [list of FileDiff]
+ * {
+ * isINodeFileUnderConstructionSnapshot: byte,
+ * {clientName: short + byte[], clientMachine: short + byte[]} (when
+ * isINodeFileUnderConstructionSnapshot is true),
+ * } (when {@link Feature#SNAPSHOT} is supported and writing snapshotINode),
+ * fsPermission: short, PermissionStatus
+ * } for INodeFile
+ * }
+ *
+ * INodeDirectoryInfo {
+ * fullPath of the directory: short + byte[],
+ * numberOfChildren: int, [list of INodeInfo of children INode],
+ * {
+ * numberOfSnapshots: int,
+ * [list of Snapshot] (when NumberOfSnapshots is positive),
+ * numberOfDirectoryDiffs: int,
+ * [list of DirectoryDiff] (NumberOfDirectoryDiffs is positive),
+ * number of children that are directories,
+ * [list of INodeDirectoryInfo of the directory children] (includes
+ * snapshot copies of deleted sub-directories)
+ * } (when {@link Feature#SNAPSHOT} is supported),
+ * }
+ *
+ * Snapshot {
+ * snapshotID: int, root of Snapshot: INodeDirectoryInfo (its local name is
+ * the name of the snapshot)
+ * }
+ *
+ * DirectoryDiff {
+ * full path of the root of the associated Snapshot: short + byte[],
+ * childrenSize: int,
+ * isSnapshotRoot: byte,
+ * snapshotINodeIsNotNull: byte (when isSnapshotRoot is false),
+ * snapshotINode: INodeDirectory (when SnapshotINodeIsNotNull is true), Diff
+ * }
+ *
+ * Diff {
+ * createdListSize: int, [Local name of INode in created list],
+ * deletedListSize: int, [INode in deleted list: INodeInfo]
+ * }
+ *
+ * FileDiff {
+ * full path of the root of the associated Snapshot: short + byte[],
+ * fileSize: long,
+ * snapshotINodeIsNotNull: byte,
+ * snapshotINode: INodeFile (when SnapshotINodeIsNotNull is true), Diff
+ * }
+ * </pre>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -449,21 +553,17 @@ public class FSImageFormat {
if (!toLoadSubtree) {
return;
}
-
+
// Step 2. Load snapshots if parent is snapshottable
int numSnapshots = in.readInt();
if (numSnapshots >= 0) {
- final INodeDirectorySnapshottable snapshottableParent
- = INodeDirectorySnapshottable.valueOf(parent, parent.getLocalName());
// load snapshots and snapshotQuota
- SnapshotFSImageFormat.loadSnapshotList(snapshottableParent,
- numSnapshots, in, this);
- if (snapshottableParent.getSnapshotQuota() > 0) {
+ SnapshotFSImageFormat.loadSnapshotList(parent, numSnapshots, in, this);
+ if (parent.getDirectorySnapshottableFeature().getSnapshotQuota() > 0) {
// add the directory to the snapshottable directory list in
// SnapshotManager. Note that we only add root when its snapshot quota
// is positive.
- this.namesystem.getSnapshotManager().addSnapshottable(
- snapshottableParent);
+ this.namesystem.getSnapshotManager().addSnapshottable(parent);
}
}
@@ -484,7 +584,7 @@ public class FSImageFormat {
/**
* Load all children of a directory
*
- * @param in
+ * @param in input to load from
* @param counter Counter to increment for namenode startup progress
* @return number of child inodes read
* @throws IOException
@@ -494,7 +594,7 @@ public class FSImageFormat {
// Rename .snapshot paths if we're doing an upgrade
parentPath = renameReservedPathsOnUpgrade(parentPath, getLayoutVersion());
final INodeDirectory parent = INodeDirectory.valueOf(
- namesystem.dir.rootDir.getNode(parentPath, true), parentPath);
+ namesystem.dir.getNode(parentPath, true), parentPath);
return loadChildren(parent, in, counter);
}
@@ -514,6 +614,16 @@ public class FSImageFormat {
INodeDirectory parentINode = fsDir.rootDir;
for (long i = 0; i < numFiles; i++) {
pathComponents = FSImageSerialization.readPathComponents(in);
+ for (int j=0; j < pathComponents.length; j++) {
+ byte[] newComponent = renameReservedComponentOnUpgrade
+ (pathComponents[j], getLayoutVersion());
+ if (!Arrays.equals(newComponent, pathComponents[j])) {
+ String oldPath = DFSUtil.byteArray2PathString(pathComponents);
+ pathComponents[j] = newComponent;
+ String newPath = DFSUtil.byteArray2PathString(pathComponents);
+ LOG.info("Renaming reserved path " + oldPath + " to " + newPath);
+ }
+ }
final INode newNode = loadINode(
pathComponents[pathComponents.length-1], false, in, counter);
@@ -580,6 +690,11 @@ public class FSImageFormat {
}
}
+ /** @return The FSDirectory of the namesystem where the fsimage is loaded */
+ public FSDirectory getFSDirectoryInLoading() {
+ return namesystem.dir;
+ }
+
public INode loadINodeWithLocalName(boolean isSnapshotINode, DataInput in,
boolean updateINodeMap) throws IOException {
return loadINodeWithLocalName(isSnapshotINode, in, updateINodeMap, null);
@@ -653,7 +768,7 @@ public class FSImageFormat {
clientName = FSImageSerialization.readString(in);
clientMachine = FSImageSerialization.readString(in);
// convert the last block to BlockUC
- if (blocks != null && blocks.length > 0) {
+ if (blocks.length > 0) {
BlockInfo lastBlk = blocks[blocks.length - 1];
blocks[blocks.length - 1] = new BlockInfoUnderConstruction(
lastBlk, replication);
@@ -671,7 +786,7 @@ public class FSImageFormat {
final INodeFile file = new INodeFile(inodeId, localName, permissions,
modificationTime, atime, blocks, replication, blockSize);
if (underConstruction) {
- file.toUnderConstruction(clientName, clientMachine, null);
+ file.toUnderConstruction(clientName, clientMachine);
}
return fileDiffs == null ? file : new INodeFile(file, fileDiffs);
} else if (numBlocks == -1) {
@@ -710,7 +825,10 @@ public class FSImageFormat {
if (withSnapshot) {
dir.addSnapshotFeature(null);
}
- return snapshottable ? new INodeDirectorySnapshottable(dir) : dir;
+ if (snapshottable) {
+ dir.addSnapshottableFeature();
+ }
+ return dir;
} else if (numBlocks == -2) {
//symlink
@@ -767,7 +885,7 @@ public class FSImageFormat {
final long preferredBlockSize = in.readLong();
return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
- accessTime, replication, preferredBlockSize);
+ accessTime, replication, preferredBlockSize, null);
}
public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
@@ -787,10 +905,10 @@ public class FSImageFormat {
final long nsQuota = in.readLong();
final long dsQuota = in.readLong();
- return nsQuota == -1L && dsQuota == -1L?
- new INodeDirectoryAttributes.SnapshotCopy(name, permissions, null, modificationTime)
+ return nsQuota == -1L && dsQuota == -1L ? new INodeDirectoryAttributes.SnapshotCopy(
+ name, permissions, null, modificationTime, null)
: new INodeDirectoryAttributes.CopyWithQuota(name, permissions,
- null, modificationTime, nsQuota, dsQuota);
+ null, modificationTime, nsQuota, dsQuota, null);
}
private void loadFilesUnderConstruction(DataInput in,
@@ -818,13 +936,13 @@ public class FSImageFormat {
oldnode = namesystem.dir.getInode(cons.getId()).asFile();
inSnapshot = true;
} else {
+ path = renameReservedPathsOnUpgrade(path, getLayoutVersion());
final INodesInPath iip = fsDir.getLastINodeInPath(path);
oldnode = INodeFile.valueOf(iip.getINode(0), path);
}
FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
- oldnode.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
- uc.getClientNode());
+ oldnode.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
if (oldnode.numBlocks() > 0) {
BlockInfo ucBlock = cons.getLastBlock();
// we do not replace the inode, just replace the last block of oldnode
@@ -1009,7 +1127,7 @@ public class FSImageFormat {
+ " option to automatically rename these paths during upgrade.";
/**
- * Same as {@link #renameReservedPathsOnUpgrade}, but for a single
+ * Same as {@link #renameReservedPathsOnUpgrade(String)}, but for a single
* byte array path component.
*/
private static byte[] renameReservedComponentOnUpgrade(byte[] component,
@@ -1018,7 +1136,6 @@ public class FSImageFormat {
if (!NameNodeLayoutVersion.supports(Feature.SNAPSHOT, layoutVersion)) {
if (Arrays.equals(component, HdfsConstants.DOT_SNAPSHOT_DIR_BYTES)) {
Preconditions.checkArgument(
- renameReservedMap != null &&
renameReservedMap.containsKey(HdfsConstants.DOT_SNAPSHOT_DIR),
RESERVED_ERROR_MSG);
component =
@@ -1030,7 +1147,7 @@ public class FSImageFormat {
}
/**
- * Same as {@link #renameReservedPathsOnUpgrade}, but for a single
+ * Same as {@link #renameReservedPathsOnUpgrade(String)}, but for a single
* byte array path component.
*/
private static byte[] renameReservedRootComponentOnUpgrade(byte[] component,
@@ -1039,7 +1156,6 @@ public class FSImageFormat {
if (!NameNodeLayoutVersion.supports(Feature.ADD_INODE_ID, layoutVersion)) {
if (Arrays.equals(component, FSDirectory.DOT_RESERVED)) {
Preconditions.checkArgument(
- renameReservedMap != null &&
renameReservedMap.containsKey(FSDirectory.DOT_RESERVED_STRING),
RESERVED_ERROR_MSG);
final String renameString = renameReservedMap
@@ -1052,4 +1168,269 @@ public class FSImageFormat {
}
return component;
}
+
+ /**
+ * A one-shot class responsible for writing an image file.
+ * The write() function should be called once, after which the getter
+ * functions may be used to retrieve information about the file that was written.
+ *
+ * This is replaced by the PB-based FSImage. The class is to maintain
+ * compatibility for the external fsimage tool.
+ */
+ @Deprecated
+ static class Saver {
+ private static final int LAYOUT_VERSION = -51;
+ private final SaveNamespaceContext context;
+ /** Set to true once an image has been written */
+ private boolean saved = false;
+
+ /** The MD5 checksum of the file that was written */
+ private MD5Hash savedDigest;
+ private final ReferenceMap referenceMap = new ReferenceMap();
+
+ private final Map<Long, INodeFile> snapshotUCMap =
+ new HashMap<Long, INodeFile>();
+
+ /** @throws IllegalStateException if the instance has not yet saved an image */
+ private void checkSaved() {
+ if (!saved) {
+ throw new IllegalStateException("FSImageSaver has not saved an image");
+ }
+ }
+
+ /** @throws IllegalStateException if the instance has already saved an image */
+ private void checkNotSaved() {
+ if (saved) {
+ throw new IllegalStateException("FSImageSaver has already saved an image");
+ }
+ }
+
+
+ Saver(SaveNamespaceContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Return the MD5 checksum of the image file that was saved.
+ */
+ MD5Hash getSavedDigest() {
+ checkSaved();
+ return savedDigest;
+ }
+
+ void save(File newFile, FSImageCompression compression) throws IOException {
+ checkNotSaved();
+
+ final FSNamesystem sourceNamesystem = context.getSourceNamesystem();
+ final INodeDirectory rootDir = sourceNamesystem.dir.rootDir;
+ final long numINodes = rootDir.getDirectoryWithQuotaFeature()
+ .getSpaceConsumed().get(Quota.NAMESPACE);
+ String sdPath = newFile.getParentFile().getParentFile().getAbsolutePath();
+ Step step = new Step(StepType.INODES, sdPath);
+ StartupProgress prog = NameNode.getStartupProgress();
+ prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+ prog.setTotal(Phase.SAVING_CHECKPOINT, step, numINodes);
+ Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
+ long startTime = now();
+ //
+ // Write out data
+ //
+ MessageDigest digester = MD5Hash.getDigester();
+ FileOutputStream fout = new FileOutputStream(newFile);
+ DigestOutputStream fos = new DigestOutputStream(fout, digester);
+ DataOutputStream out = new DataOutputStream(fos);
+ try {
+ out.writeInt(LAYOUT_VERSION);
+ LayoutFlags.write(out);
+ // We use the non-locked version of getNamespaceInfo here since
+ // the coordinating thread of saveNamespace already has read-locked
+ // the namespace for us. If we attempt to take another readlock
+ // from the actual saver thread, there's a potential of a
+ // fairness-related deadlock. See the comments on HDFS-2223.
+ out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
+ .getNamespaceID());
+ out.writeLong(numINodes);
+ out.writeLong(sourceNamesystem.getGenerationStampV1());
+ out.writeLong(sourceNamesystem.getGenerationStampV2());
+ out.writeLong(sourceNamesystem.getGenerationStampAtblockIdSwitch());
+ out.writeLong(sourceNamesystem.getLastAllocatedBlockId());
+ out.writeLong(context.getTxId());
+ out.writeLong(sourceNamesystem.getLastInodeId());
+
+
+ sourceNamesystem.getSnapshotManager().write(out);
+
+ // write compression info and set up compressed stream
+ out = compression.writeHeaderAndWrapStream(fos);
+ LOG.info("Saving image file " + newFile +
+ " using " + compression);
+
+ // save the root
+ saveINode2Image(rootDir, out, false, referenceMap, counter);
+ // save the rest of the nodes
+ saveImage(rootDir, out, true, false, counter);
+ prog.endStep(Phase.SAVING_CHECKPOINT, step);
+ // Now that the step is finished, set counter equal to total to adjust
+ // for possible under-counting due to reference inodes.
+ prog.setCount(Phase.SAVING_CHECKPOINT, step, numINodes);
+ // save files under construction
+ // TODO: for HDFS-5428, since we cannot break the compatibility of
+ // fsimage, we store part of the under-construction files that are only
+ // in snapshots in this "under-construction-file" section. As a
+ // temporary solution, we use "/.reserved/.inodes/<inodeid>" as their
+ // paths, so that when loading fsimage we do not put them into the lease
+ // map. In the future, we can remove this hack when we can bump the
+ // layout version.
+ sourceNamesystem.saveFilesUnderConstruction(out, snapshotUCMap);
+
+ context.checkCancelled();
+ sourceNamesystem.saveSecretManagerStateCompat(out, sdPath);
+ context.checkCancelled();
+ sourceNamesystem.getCacheManager().saveStateCompat(out, sdPath);
+ context.checkCancelled();
+ out.flush();
+ context.checkCancelled();
+ fout.getChannel().force(true);
+ } finally {
+ out.close();
+ }
+
+ saved = true;
+ // set md5 of the saved image
+ savedDigest = new MD5Hash(digester.digest());
+
+ LOG.info("Image file " + newFile + " of size " + newFile.length() +
+ " bytes saved in " + (now() - startTime)/1000 + " seconds.");
+ }
+
+ /**
+ * Save children INodes.
+ * @param children The list of children INodes
+ * @param out The DataOutputStream to write
+ * @param inSnapshot Whether the parent directory or its ancestor is in
+ * the deleted list of some snapshot (caused by rename or
+ * deletion)
+ * @param counter Counter to increment for namenode startup progress
+ * @return Number of children that are directory
+ */
+ private int saveChildren(ReadOnlyList<INode> children,
+ DataOutputStream out, boolean inSnapshot, Counter counter)
+ throws IOException {
+ // Write normal children INode.
+ out.writeInt(children.size());
+ int dirNum = 0;
+ int i = 0;
+ for(INode child : children) {
+ // print all children first
+ // TODO: for HDFS-5428, we cannot change the format/content of fsimage
+ // here, thus even if the parent directory is in snapshot, we still
+ // do not handle INodeUC as those stored in deleted list
+ saveINode2Image(child, out, false, referenceMap, counter);
+ if (child.isDirectory()) {
+ dirNum++;
+ } else if (inSnapshot && child.isFile()
+ && child.asFile().isUnderConstruction()) {
+ this.snapshotUCMap.put(child.getId(), child.asFile());
+ }
+ if (i++ % 50 == 0) {
+ context.checkCancelled();
+ }
+ }
+ return dirNum;
+ }
+
+ /**
+ * Save file tree image starting from the given root.
+ * This is a recursive procedure, which first saves all children and
+ * snapshot diffs of a current directory and then moves inside the
+ * sub-directories.
+ *
+ * @param current The current node
+ * @param out The DataoutputStream to write the image
+ * @param toSaveSubtree Whether or not to save the subtree to fsimage. For
+ * reference node, its subtree may already have been
+ * saved before.
+ * @param inSnapshot Whether the current directory is in snapshot
+ * @param counter Counter to increment for namenode startup progress
+ */
+ private void saveImage(INodeDirectory current, DataOutputStream out,
+ boolean toSaveSubtree, boolean inSnapshot, Counter counter)
+ throws IOException {
+ // write the inode id of the directory
+ out.writeLong(current.getId());
+
+ if (!toSaveSubtree) {
+ return;
+ }
+
+ final ReadOnlyList<INode> children = current
+ .getChildrenList(Snapshot.CURRENT_STATE_ID);
+ int dirNum = 0;
+ List<INodeDirectory> snapshotDirs = null;
+ DirectoryWithSnapshotFeature sf = current.getDirectoryWithSnapshotFeature();
+ if (sf != null) {
+ snapshotDirs = new ArrayList<INodeDirectory>();
+ sf.getSnapshotDirectory(snapshotDirs);
+ dirNum += snapshotDirs.size();
+ }
+
+ // 2. Write INodeDirectorySnapshottable#snapshotsByNames to record all
+ // Snapshots
+ if (current.isDirectory() && current.asDirectory().isSnapshottable()) {
+ SnapshotFSImageFormat.saveSnapshots(current.asDirectory(), out);
+ } else {
+ out.writeInt(-1); // # of snapshots
+ }
+
+ // 3. Write children INode
+ dirNum += saveChildren(children, out, inSnapshot, counter);
+
+ // 4. Write DirectoryDiff lists, if there is any.
+ SnapshotFSImageFormat.saveDirectoryDiffList(current, out, referenceMap);
+
+ // Write sub-tree of sub-directories, including possible snapshots of
+ // deleted sub-directories
+ out.writeInt(dirNum); // the number of sub-directories
+ for(INode child : children) {
+ if(!child.isDirectory()) {
+ continue;
+ }
+ // make sure we only save the subtree under a reference node once
+ boolean toSave = child.isReference() ?
+ referenceMap.toProcessSubtree(child.getId()) : true;
+ saveImage(child.asDirectory(), out, toSave, inSnapshot, counter);
+ }
+ if (snapshotDirs != null) {
+ for (INodeDirectory subDir : snapshotDirs) {
+ // make sure we only save the subtree under a reference node once
+ boolean toSave = subDir.getParentReference() != null ?
+ referenceMap.toProcessSubtree(subDir.getId()) : true;
+ saveImage(subDir, out, toSave, true, counter);
+ }
+ }
+ }
+
+ /**
+ * Saves inode and increments progress counter.
+ *
+ * @param inode INode to save
+ * @param out DataOutputStream to receive inode
+ * @param writeUnderConstruction boolean true if this is under construction
+ * @param referenceMap ReferenceMap containing reference inodes
+ * @param counter Counter to increment for namenode startup progress
+ * @throws IOException thrown if there is an I/O error
+ */
+ private void saveINode2Image(INode inode, DataOutputStream out,
+ boolean writeUnderConstruction, ReferenceMap referenceMap,
+ Counter counter) throws IOException {
+ FSImageSerialization.saveINode2Image(inode, out, writeUnderConstruction,
+ referenceMap);
+ // Intentionally do not increment counter for reference inodes, because it
+ // is too difficult at this point to assess whether or not this is a
+ // reference that counts toward quota.
+ if (!(inode instanceof INodeReference)) {
+ counter.increment();
+ }
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java Tue Aug 19 23:49:39 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.permission.A
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
@@ -49,6 +50,8 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.AclFeatureProto;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrCompactProto;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrFeatureProto;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -74,6 +77,14 @@ public final class FSImageFormatPBINode
.values();
private static final AclEntryType[] ACL_ENTRY_TYPE_VALUES = AclEntryType
.values();
+
+ private static final int XATTR_NAMESPACE_MASK = 3;
+ private static final int XATTR_NAMESPACE_OFFSET = 30;
+ private static final int XATTR_NAME_MASK = (1 << 24) - 1;
+ private static final int XATTR_NAME_OFFSET = 6;
+ private static final XAttr.NameSpace[] XATTR_NAMESPACE_VALUES =
+ XAttr.NameSpace.values();
+
private static final Log LOG = LogFactory.getLog(FSImageFormatPBINode.class);
@@ -103,6 +114,25 @@ public final class FSImageFormatPBINode
}
return b.build();
}
+
+ public static ImmutableList<XAttr> loadXAttrs(
+ XAttrFeatureProto proto, final String[] stringTable) {
+ ImmutableList.Builder<XAttr> b = ImmutableList.builder();
+ for (XAttrCompactProto xAttrCompactProto : proto.getXAttrsList()) {
+ int v = xAttrCompactProto.getName();
+ int nid = (v >> XATTR_NAME_OFFSET) & XATTR_NAME_MASK;
+ int ns = (v >> XATTR_NAMESPACE_OFFSET) & XATTR_NAMESPACE_MASK;
+ String name = stringTable[nid];
+ byte[] value = null;
+ if (xAttrCompactProto.getValue() != null) {
+ value = xAttrCompactProto.getValue().toByteArray();
+ }
+ b.add(new XAttr.Builder().setNameSpace(XATTR_NAMESPACE_VALUES[ns])
+ .setName(name).setValue(value).build());
+ }
+
+ return b.build();
+ }
public static INodeDirectory loadINodeDirectory(INodeSection.INode n,
LoaderContext state) {
@@ -123,6 +153,10 @@ public final class FSImageFormatPBINode
dir.addAclFeature(new AclFeature(loadAclEntries(d.getAcl(),
state.getStringTable())));
}
+ if (d.hasXAttrs()) {
+ dir.addXAttrFeature(new XAttrFeature(
+ loadXAttrs(d.getXAttrs(), state.getStringTable())));
+ }
return dir;
}
@@ -255,12 +289,16 @@ public final class FSImageFormatPBINode
file.addAclFeature(new AclFeature(loadAclEntries(f.getAcl(),
state.getStringTable())));
}
+
+ if (f.hasXAttrs()) {
+ file.addXAttrFeature(new XAttrFeature(
+ loadXAttrs(f.getXAttrs(), state.getStringTable())));
+ }
// under-construction information
if (f.hasFileUC()) {
INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
- file.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
- null);
+ file.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
if (blocks.length > 0) {
BlockInfo lastBlk = file.getLastBlock();
// replace the last block of file
@@ -295,6 +333,11 @@ public final class FSImageFormatPBINode
}
dir.rootDir.cloneModificationTime(root);
dir.rootDir.clonePermissionStatus(root);
+ // root dir supports having extended attributes according to POSIX
+ final XAttrFeature f = root.getXAttrFeature();
+ if (f != null) {
+ dir.rootDir.addXAttrFeature(f);
+ }
}
}
@@ -320,6 +363,26 @@ public final class FSImageFormatPBINode
}
return b;
}
+
+ private static XAttrFeatureProto.Builder buildXAttrs(XAttrFeature f,
+ final SaverContext.DeduplicationMap<String> stringMap) {
+ XAttrFeatureProto.Builder b = XAttrFeatureProto.newBuilder();
+ for (XAttr a : f.getXAttrs()) {
+ XAttrCompactProto.Builder xAttrCompactBuilder = XAttrCompactProto.
+ newBuilder();
+ int v = ((a.getNameSpace().ordinal() & XATTR_NAMESPACE_MASK) <<
+ XATTR_NAMESPACE_OFFSET)
+ | ((stringMap.getId(a.getName()) & XATTR_NAME_MASK) <<
+ XATTR_NAME_OFFSET);
+ xAttrCompactBuilder.setName(v);
+ if (a.getValue() != null) {
+ xAttrCompactBuilder.setValue(PBHelper.getByteString(a.getValue()));
+ }
+ b.addXAttrs(xAttrCompactBuilder.build());
+ }
+
+ return b;
+ }
public static INodeSection.INodeFile.Builder buildINodeFile(
INodeFileAttributes file, final SaverContext state) {
@@ -334,6 +397,10 @@ public final class FSImageFormatPBINode
if (f != null) {
b.setAcl(buildAclEntries(f, state.getStringMap()));
}
+ XAttrFeature xAttrFeature = file.getXAttrFeature();
+ if (xAttrFeature != null) {
+ b.setXAttrs(buildXAttrs(xAttrFeature, state.getStringMap()));
+ }
return b;
}
@@ -350,6 +417,10 @@ public final class FSImageFormatPBINode
if (f != null) {
b.setAcl(buildAclEntries(f, state.getStringMap()));
}
+ XAttrFeature xAttrFeature = dir.getXAttrFeature();
+ if (xAttrFeature != null) {
+ b.setXAttrs(buildXAttrs(xAttrFeature, state.getStringMap()));
+ }
return b;
}
@@ -460,8 +531,10 @@ public final class FSImageFormatPBINode
INodeSection.INodeFile.Builder b = buildINodeFile(n,
parent.getSaverContext());
- for (Block block : n.getBlocks()) {
- b.addBlocks(PBHelper.convert(block));
+ if (n.getBlocks() != null) {
+ for (Block block : n.getBlocks()) {
+ b.addBlocks(PBHelper.convert(block));
+ }
}
FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java Tue Aug 19 23:49:39 2014
@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.util.MD5Fi
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressorStream;
+import org.apache.hadoop.util.Time;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -160,13 +161,13 @@ public final class FSImageFormatProtobuf
}
void load(File file) throws IOException {
- long start = System.currentTimeMillis();
+ long start = Time.monotonicNow();
imgDigest = MD5FileUtils.computeMd5ForFile(file);
RandomAccessFile raFile = new RandomAccessFile(file, "r");
FileInputStream fin = new FileInputStream(file);
try {
loadInternal(raFile, fin);
- long end = System.currentTimeMillis();
+ long end = Time.monotonicNow();
LOG.info("Loaded FSImage in " + (end - start) / 1000 + " seconds.");
} finally {
fin.close();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java Tue Aug 19 23:49:39 2014
@@ -263,7 +263,7 @@ class FSImagePreTransactionalStorageInsp
// the image is already current, discard edits
LOG.debug(
"Name checkpoint time is newer than edits, not loading edits.");
- return Collections.<File>emptyList();
+ return Collections.emptyList();
}
return getEditsInStorageDir(latestEditsSD);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
@@ -31,20 +36,20 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import com.google.common.base.Preconditions;
/**
* Static utility functions for serializing various pieces of data in the correct
@@ -82,6 +87,26 @@ public class FSImageSerialization {
final ShortWritable U_SHORT = new ShortWritable();
final IntWritable U_INT = new IntWritable();
final LongWritable U_LONG = new LongWritable();
+ final FsPermission FILE_PERM = new FsPermission((short) 0);
+ }
+
+ private static void writePermissionStatus(INodeAttributes inode,
+ DataOutput out) throws IOException {
+ final FsPermission p = TL_DATA.get().FILE_PERM;
+ p.fromShort(inode.getFsPermissionShort());
+ PermissionStatus.write(out, inode.getUserName(), inode.getGroupName(), p);
+ }
+
+ private static void writeBlocks(final Block[] blocks,
+ final DataOutput out) throws IOException {
+ if (blocks == null) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(blocks.length);
+ for (Block blk : blocks) {
+ blk.write(out);
+ }
+ }
}
// Helper function that reads in an INodeUnderConstruction
@@ -123,10 +148,187 @@ public class FSImageSerialization {
INodeFile file = new INodeFile(inodeId, name, perm, modificationTime,
modificationTime, blocks, blockReplication, preferredBlockSize);
- file.toUnderConstruction(clientName, clientMachine, null);
+ file.toUnderConstruction(clientName, clientMachine);
return file;
}
+ // Helper function that writes an INodeUnderConstruction
+ // into the input stream
+ //
+ static void writeINodeUnderConstruction(DataOutputStream out, INodeFile cons,
+ String path) throws IOException {
+ writeString(path, out);
+ out.writeLong(cons.getId());
+ out.writeShort(cons.getFileReplication());
+ out.writeLong(cons.getModificationTime());
+ out.writeLong(cons.getPreferredBlockSize());
+
+ writeBlocks(cons.getBlocks(), out);
+ cons.getPermissionStatus().write(out);
+
+ FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
+ writeString(uc.getClientName(), out);
+ writeString(uc.getClientMachine(), out);
+
+ out.writeInt(0); // do not store locations of last block
+ }
+
+ /**
+ * Serialize a {@link INodeFile} node
+ * @param node The node to write
+ * @param out The {@link DataOutputStream} where the fields are written
+ * @param writeBlock Whether to write block information
+ */
+ public static void writeINodeFile(INodeFile file, DataOutput out,
+ boolean writeUnderConstruction) throws IOException {
+ writeLocalName(file, out);
+ out.writeLong(file.getId());
+ out.writeShort(file.getFileReplication());
+ out.writeLong(file.getModificationTime());
+ out.writeLong(file.getAccessTime());
+ out.writeLong(file.getPreferredBlockSize());
+
+ writeBlocks(file.getBlocks(), out);
+ SnapshotFSImageFormat.saveFileDiffList(file, out);
+
+ if (writeUnderConstruction) {
+ if (file.isUnderConstruction()) {
+ out.writeBoolean(true);
+ final FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
+ writeString(uc.getClientName(), out);
+ writeString(uc.getClientMachine(), out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ writePermissionStatus(file, out);
+ }
+
+ /** Serialize an {@link INodeFileAttributes}. */
+ public static void writeINodeFileAttributes(INodeFileAttributes file,
+ DataOutput out) throws IOException {
+ writeLocalName(file, out);
+ writePermissionStatus(file, out);
+ out.writeLong(file.getModificationTime());
+ out.writeLong(file.getAccessTime());
+
+ out.writeShort(file.getFileReplication());
+ out.writeLong(file.getPreferredBlockSize());
+ }
+
+ private static void writeQuota(Quota.Counts quota, DataOutput out)
+ throws IOException {
+ out.writeLong(quota.get(Quota.NAMESPACE));
+ out.writeLong(quota.get(Quota.DISKSPACE));
+ }
+
+ /**
+ * Serialize a {@link INodeDirectory}
+ * @param node The node to write
+ * @param out The {@link DataOutput} where the fields are written
+ */
+ public static void writeINodeDirectory(INodeDirectory node, DataOutput out)
+ throws IOException {
+ writeLocalName(node, out);
+ out.writeLong(node.getId());
+ out.writeShort(0); // replication
+ out.writeLong(node.getModificationTime());
+ out.writeLong(0); // access time
+ out.writeLong(0); // preferred block size
+ out.writeInt(-1); // # of blocks
+
+ writeQuota(node.getQuotaCounts(), out);
+
+ if (node.isSnapshottable()) {
+ out.writeBoolean(true);
+ } else {
+ out.writeBoolean(false);
+ out.writeBoolean(node.isWithSnapshot());
+ }
+
+ writePermissionStatus(node, out);
+ }
+
+ /**
+ * Serialize a {@link INodeDirectory}
+ * @param a The node to write
+ * @param out The {@link DataOutput} where the fields are written
+ */
+ public static void writeINodeDirectoryAttributes(
+ INodeDirectoryAttributes a, DataOutput out) throws IOException {
+ writeLocalName(a, out);
+ writePermissionStatus(a, out);
+ out.writeLong(a.getModificationTime());
+ writeQuota(a.getQuotaCounts(), out);
+ }
+
+ /**
+ * Serialize a {@link INodeSymlink} node
+ * @param node The node to write
+ * @param out The {@link DataOutput} where the fields are written
+ */
+ private static void writeINodeSymlink(INodeSymlink node, DataOutput out)
+ throws IOException {
+ writeLocalName(node, out);
+ out.writeLong(node.getId());
+ out.writeShort(0); // replication
+ out.writeLong(0); // modification time
+ out.writeLong(0); // access time
+ out.writeLong(0); // preferred block size
+ out.writeInt(-2); // # of blocks
+
+ Text.writeString(out, node.getSymlinkString());
+ writePermissionStatus(node, out);
+ }
+
+ /** Serialize a {@link INodeReference} node */
+ private static void writeINodeReference(INodeReference ref, DataOutput out,
+ boolean writeUnderConstruction, ReferenceMap referenceMap
+ ) throws IOException {
+ writeLocalName(ref, out);
+ out.writeLong(ref.getId());
+ out.writeShort(0); // replication
+ out.writeLong(0); // modification time
+ out.writeLong(0); // access time
+ out.writeLong(0); // preferred block size
+ out.writeInt(-3); // # of blocks
+
+ final boolean isWithName = ref instanceof INodeReference.WithName;
+ out.writeBoolean(isWithName);
+
+ if (!isWithName) {
+ Preconditions.checkState(ref instanceof INodeReference.DstReference);
+ // dst snapshot id
+ out.writeInt(((INodeReference.DstReference) ref).getDstSnapshotId());
+ } else {
+ out.writeInt(((INodeReference.WithName) ref).getLastSnapshotId());
+ }
+
+ final INodeReference.WithCount withCount
+ = (INodeReference.WithCount)ref.getReferredINode();
+ referenceMap.writeINodeReferenceWithCount(withCount, out,
+ writeUnderConstruction);
+ }
+
+ /**
+ * Save one inode's attributes to the image.
+ */
+ public static void saveINode2Image(INode node, DataOutput out,
+ boolean writeUnderConstruction, ReferenceMap referenceMap)
+ throws IOException {
+ if (node.isReference()) {
+ writeINodeReference(node.asReference(), out, writeUnderConstruction,
+ referenceMap);
+ } else if (node.isDirectory()) {
+ writeINodeDirectory(node.asDirectory(), out);
+ } else if (node.isSymlink()) {
+ writeINodeSymlink(node.asSymlink(), out);
+ } else if (node.isFile()) {
+ writeINodeFile(node.asFile(), out, writeUnderConstruction);
+ }
+ }
+
// This should be reverted to package private once the ImageLoader
// code is moved into this package. This method should not be called
// by other code.
@@ -206,7 +408,7 @@ public class FSImageSerialization {
/**
* Reading the path from the image and converting it to byte[][] directly
* this saves us an array copy and conversions to and from String
- * @param in
+ * @param in input to read from
* @return the array each element of which is a byte[] representation
* of a path component
* @throws IOException
@@ -226,6 +428,12 @@ public class FSImageSerialization {
in.readFully(createdNodeName);
return createdNodeName;
}
+
+ private static void writeLocalName(INodeAttributes inode, DataOutput out)
+ throws IOException {
+ final byte[] name = inode.getLocalNameBytes();
+ writeBytes(name, out);
+ }
public static void writeBytes(byte[] data, DataOutput out)
throws IOException {