You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ss...@apache.org on 2012/10/16 02:03:59 UTC
svn commit: r1398581 [3/9] - in
/hadoop/common/branches/MR-3902/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/
hadoop-hdfs-h...
Propchange: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1363593-1396941
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Tue Oct 16 00:02:55 2012
@@ -312,9 +312,6 @@ public class Hdfs extends AbstractFileSy
return listing.toArray(new FileStatus[listing.size()]);
}
- /**
- * {@inheritDoc}
- */
@Override
public RemoteIterator<Path> listCorruptFileBlocks(Path path)
throws IOException {
@@ -324,7 +321,7 @@ public class Hdfs extends AbstractFileSy
@Override
public void mkdir(Path dir, FsPermission permission, boolean createParent)
throws IOException, UnresolvedLinkException {
- dfs.mkdirs(getUriPath(dir), permission, createParent);
+ dfs.primitiveMkdir(getUriPath(dir), permission, createParent);
}
@SuppressWarnings("deprecation")
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java Tue Oct 16 00:02:55 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.fs;
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -31,10 +32,10 @@ import org.apache.hadoop.classification.
@InterfaceAudience.Public
public class HdfsVolumeId implements VolumeId {
- private final byte id;
+ private final byte[] id;
private final boolean isValid;
- public HdfsVolumeId(byte id, boolean isValid) {
+ public HdfsVolumeId(byte[] id, boolean isValid) {
this.id = id;
this.isValid = isValid;
}
@@ -69,6 +70,6 @@ public class HdfsVolumeId implements Vol
@Override
public String toString() {
- return Byte.toString(id);
+ return Base64.encodeBase64String(id);
}
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java Tue Oct 16 00:02:55 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -201,7 +202,7 @@ class BlockStorageLocationUtil {
ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
// Start off all IDs as invalid, fill it in later with results from RPCs
for (int i = 0; i < b.getLocations().length; i++) {
- l.add(new HdfsVolumeId((byte)-1, false));
+ l.add(new HdfsVolumeId(null, false));
}
blockVolumeIds.put(b, l);
}
@@ -234,8 +235,8 @@ class BlockStorageLocationUtil {
}
// Get the VolumeId by indexing into the list of VolumeIds
// provided by the datanode
- HdfsVolumeId id = new HdfsVolumeId(metaVolumeIds.get(volumeIndex)[0],
- true);
+ byte[] volumeId = metaVolumeIds.get(volumeIndex);
+ HdfsVolumeId id = new HdfsVolumeId(volumeId, true);
// Find out which index we are in the LocatedBlock's replicas
LocatedBlock locBlock = extBlockToLocBlock.get(extBlock);
DatanodeInfo[] dnInfos = locBlock.getLocations();
@@ -255,8 +256,8 @@ class BlockStorageLocationUtil {
}
// Place VolumeId at the same index as the DN's index in the list of
// replicas
- List<VolumeId> VolumeIds = blockVolumeIds.get(locBlock);
- VolumeIds.set(index, id);
+ List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+ volumeIds.set(index, id);
}
}
return blockVolumeIds;
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Oct 16 00:02:55 2012
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -77,6 +79,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@@ -89,7 +92,9 @@ import org.apache.hadoop.fs.FsServerDefa
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -206,6 +211,7 @@ public class DFSClient implements java.i
final int writePacketSize;
final int socketTimeout;
final int socketCacheCapacity;
+ final long socketCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */
final int timeWindow;
final int nCachedConnRetry;
@@ -254,6 +260,8 @@ public class DFSClient implements java.i
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+ socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
timeWindow = conf
@@ -424,7 +432,7 @@ public class DFSClient implements java.i
Joiner.on(',').join(localInterfaceAddrs) + "]");
}
- this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
+ this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
}
/**
@@ -531,7 +539,7 @@ public class DFSClient implements java.i
* until the first output stream is created. The same instance will
* be returned until all output streams are closed.
*/
- public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
+ public LeaseRenewer getLeaseRenewer() throws IOException {
return LeaseRenewer.getInstance(authority, ugi, this);
}
@@ -638,7 +646,6 @@ public class DFSClient implements java.i
void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
- socketCache.clear();
try {
// remove reference to this client and stop the renewer,
@@ -685,7 +692,6 @@ public class DFSClient implements java.i
public synchronized void close() throws IOException {
if(clientRunning) {
closeAllFilesBeingWritten(false);
- socketCache.clear();
clientRunning = false;
getLeaseRenewer().closeClient(this);
// close connections to the namenode
@@ -694,6 +700,17 @@ public class DFSClient implements java.i
}
/**
+ * Close all open streams, abandoning all of the leases and files being
+ * created.
+ * @param abort whether streams should be gracefully closed
+ */
+ public void closeOutputStreams(boolean abort) {
+ if (clientRunning) {
+ closeAllFilesBeingWritten(abort);
+ }
+ }
+
+ /**
* Get the default block size for this cluster
* @return the default block size in bytes
*/
@@ -1603,7 +1620,8 @@ public class DFSClient implements java.i
}
List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
final DataOutputBuffer md5out = new DataOutputBuffer();
- int bytesPerCRC = 0;
+ int bytesPerCRC = -1;
+ DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
long crcPerBlock = 0;
boolean refetchBlocks = false;
int lastRetriedIndex = -1;
@@ -1707,6 +1725,17 @@ public class DFSClient implements java.i
checksumData.getMd5().toByteArray());
md5.write(md5out);
+ // read crc-type
+ final DataChecksum.Type ct = HdfsProtoUtil.
+ fromProto(checksumData.getCrcType());
+ if (i == 0) { // first block
+ crcType = ct;
+ } else if (crcType != DataChecksum.Type.MIXED
+ && crcType != ct) {
+ // if crc types are mixed in a file
+ crcType = DataChecksum.Type.MIXED;
+ }
+
done = true;
if (LOG.isDebugEnabled()) {
@@ -1732,7 +1761,18 @@ public class DFSClient implements java.i
//compute file MD5
final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
- return new MD5MD5CRC32FileChecksum(bytesPerCRC, crcPerBlock, fileMD5);
+ switch (crcType) {
+ case CRC32:
+ return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
+ crcPerBlock, fileMD5);
+ case CRC32C:
+ return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
+ crcPerBlock, fileMD5);
+ default:
+ // we should never get here since the validity was checked
+ // when getCrcType() was called above.
+ return null;
+ }
}
/**
@@ -1845,6 +1885,20 @@ public class DFSClient implements java.i
throw re.unwrapRemoteException(AccessControlException.class);
}
}
+
+ /**
+ * Rolls the edit log on the active NameNode.
+ * @return the txid of the new log segment
+ *
+ * @see ClientProtocol#rollEdits()
+ */
+ long rollEdits() throws AccessControlException, IOException {
+ try {
+ return namenode.rollEdits();
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class);
+ }
+ }
/**
* enable/disable restore failed storage.
@@ -1917,34 +1971,29 @@ public class DFSClient implements java.i
*/
public boolean mkdirs(String src, FsPermission permission,
boolean createParent) throws IOException {
- checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
- if(LOG.isDebugEnabled()) {
- LOG.debug(src + ": masked=" + masked);
- }
- try {
- return namenode.mkdirs(src, masked, createParent);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- InvalidPathException.class,
- FileAlreadyExistsException.class,
- FileNotFoundException.class,
- ParentNotDirectoryException.class,
- SafeModeException.class,
- NSQuotaExceededException.class,
- UnresolvedPathException.class);
- }
+ return primitiveMkdir(src, masked, createParent);
}
-
+
/**
* Same {{@link #mkdirs(String, FsPermission, boolean)} except
* that the permissions has already been masked against umask.
*/
public boolean primitiveMkdir(String src, FsPermission absPermission)
throws IOException {
+ return primitiveMkdir(src, absPermission, true);
+ }
+
+ /**
+ * Same {{@link #mkdirs(String, FsPermission, boolean)} except
+ * that the permissions has already been masked against umask.
+ */
+ public boolean primitiveMkdir(String src, FsPermission absPermission,
+ boolean createParent)
+ throws IOException {
checkOpen();
if (absPermission == null) {
absPermission =
@@ -1955,15 +2004,20 @@ public class DFSClient implements java.i
LOG.debug(src + ": masked=" + absPermission);
}
try {
- return namenode.mkdirs(src, absPermission, true);
+ return namenode.mkdirs(src, absPermission, createParent);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
+ InvalidPathException.class,
+ FileAlreadyExistsException.class,
+ FileNotFoundException.class,
+ ParentNotDirectoryException.class,
+ SafeModeException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
UnresolvedPathException.class);
}
}
-
+
/**
* Get {@link ContentSummary} rooted at the specified directory.
* @param path The string representation of the path
@@ -2035,10 +2089,7 @@ public class DFSClient implements java.i
}
boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
- if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
- return true;
- }
- return false;
+ return shortCircuitLocalReads && isLocalAddress(targetAddr);
}
void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Oct 16 00:02:55 2012
@@ -74,6 +74,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
public static final int DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
+ public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
+ public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
@@ -174,6 +176,25 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
+
+ // Whether to enable datanode's stale state detection and usage
+ public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
+ public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
+ // Whether to enable datanode's stale state detection and usage
+ public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+ public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
+ // The default value of the time interval for marking datanodes as stale
+ public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
+ public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+ // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states.
+ // This value uses the times of heartbeat interval to define the minimum value for stale interval.
+ public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+ public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+
+ // When the number stale datanodes marked as stale reached this certian ratio,
+ // stop avoiding writing to stale nodes so as to prevent causing hotspots.
+ public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+ public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
// Replication monitoring related keys
public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
@@ -329,6 +350,10 @@ public class DFSConfigKeys extends Commo
"dfs.image.transfer.bandwidthPerSec";
public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling
+ // Image transfer timeout
+ public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
+ public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
+
//Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
@@ -382,4 +407,42 @@ public class DFSConfigKeys extends Commo
public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
+
+ // Journal-node related configs. These are read on the JN side.
+ public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
+ public static final String DFS_JOURNALNODE_EDITS_DIR_DEFAULT = "/tmp/hadoop/dfs/journalnode/";
+ public static final String DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address";
+ public static final int DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485;
+ public static final String DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT;
+
+ public static final String DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
+ public static final int DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
+ public static final String DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
+
+ public static final String DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
+ public static final String DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal";
+ public static final String DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
+
+ // Journal-node related configs for the client side.
+ public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
+ public static final int DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;
+
+ // Quorum-journal timeouts for various operations. Unlikely to need
+ // to be tweaked, but configurable just in case.
+ public static final String DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.start-segment.timeout.ms";
+ public static final String DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.prepare-recovery.timeout.ms";
+ public static final String DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
+ public static final String DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
+ public static final String DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
+ public static final String DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
+ public static final String DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
+ public static final String DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
+ public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
+ public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
+ public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
+ public static final int DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000;
+ public static final int DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
+ public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
+ public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
+ public static final int DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Oct 16 00:02:55 2012
@@ -243,6 +243,10 @@ public class DFSInputStream extends FSIn
locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
}
+ private synchronized boolean blockUnderConstruction() {
+ return locatedBlocks.isUnderConstruction();
+ }
+
/**
* Returns the datanode from which the stream is currently reading.
*/
@@ -878,7 +882,9 @@ public class DFSInputStream extends FSIn
String clientName)
throws IOException {
- if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
+ // Can't local read a block under construction, see HDFS-2757
+ if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
+ !blockUnderConstruction()) {
return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
dfsClient.connectToDnViaHostname());
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Oct 16 00:02:55 2012
@@ -56,8 +56,8 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@@ -107,8 +107,8 @@ import com.google.common.annotations.Vis
****************************************************************/
@InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer implements Syncable {
- private final DFSClient dfsClient;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
+ private final DFSClient dfsClient;
private Socket s;
// closed is accessed by different threads under different locks.
private volatile boolean closed = false;
@@ -138,15 +138,15 @@ public class DFSOutputStream extends FSO
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
- private class Packet {
- long seqno; // sequencenumber of buffer in block
- long offsetInBlock; // offset in block
- private boolean lastPacketInBlock; // is this the last packet in block?
- boolean syncBlock; // this packet forces the current block to disk
- int numChunks; // number of chunks currently in packet
- int maxChunks; // max chunks in packet
-
+ private static class Packet {
+ private static final long HEART_BEAT_SEQNO = -1L;
+ long seqno; // sequencenumber of buffer in block
+ final long offsetInBlock; // offset in block
+ boolean syncBlock; // this packet forces the current block to disk
+ int numChunks; // number of chunks currently in packet
+ final int maxChunks; // max chunks in packet
byte[] buf;
+ private boolean lastPacketInBlock; // is this the last packet in block?
/**
* buf is pointed into like follows:
@@ -164,45 +164,36 @@ public class DFSOutputStream extends FSO
*/
int checksumStart;
int checksumPos;
- int dataStart;
+ final int dataStart;
int dataPos;
- private static final long HEART_BEAT_SEQNO = -1L;
-
/**
* Create a heartbeat packet.
*/
- Packet() {
- this.lastPacketInBlock = false;
- this.numChunks = 0;
- this.offsetInBlock = 0;
- this.seqno = HEART_BEAT_SEQNO;
-
- buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
-
- checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
- maxChunks = 0;
+ Packet(int checksumSize) {
+ this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
}
/**
* Create a new packet.
*
- * @param pktSize maximum size of the packet, including checksum data and actual data.
+ * @param pktSize maximum size of the packet,
+ * including checksum data and actual data.
* @param chunksPerPkt maximum number of chunks per packet.
* @param offsetInBlock offset in bytes into the HDFS block.
*/
- Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
+ Packet(int pktSize, int chunksPerPkt, long offsetInBlock,
+ long seqno, int checksumSize) {
this.lastPacketInBlock = false;
this.numChunks = 0;
this.offsetInBlock = offsetInBlock;
- this.seqno = currentSeqno;
- currentSeqno++;
+ this.seqno = seqno;
buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart;
- dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
+ dataStart = checksumStart + (chunksPerPkt * checksumSize);
dataPos = dataStart;
maxChunks = chunksPerPkt;
}
@@ -412,6 +403,7 @@ public class DFSOutputStream extends FSO
response.join();
response = null;
} catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
}
}
@@ -439,6 +431,7 @@ public class DFSOutputStream extends FSO
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
}
doSleep = false;
now = Time.now();
@@ -448,7 +441,7 @@ public class DFSOutputStream extends FSO
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
- one = new Packet(); // heartbeat packet
+ one = new Packet(checksum.getChecksumSize()); // heartbeat packet
} else {
one = dataQueue.getFirst(); // regular data packet
}
@@ -488,6 +481,7 @@ public class DFSOutputStream extends FSO
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
}
}
}
@@ -518,7 +512,7 @@ public class DFSOutputStream extends FSO
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
- // write to primary DN
+ // write to primary DN
errorIndex = 0;
throw e;
}
@@ -607,6 +601,7 @@ public class DFSOutputStream extends FSO
response.close();
response.join();
} catch (InterruptedException e) {
+ DFSClient.LOG.warn("Caught exception ", e);
} finally {
response = null;
}
@@ -1178,6 +1173,7 @@ public class DFSOutputStream extends FSO
Thread.sleep(sleeptime);
sleeptime *= 2;
} catch (InterruptedException ie) {
+ DFSClient.LOG.warn("Caught exception ", ie);
}
}
} else {
@@ -1421,7 +1417,7 @@ public class DFSOutputStream extends FSO
if (currentPacket == null) {
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
+ bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
@@ -1468,7 +1464,8 @@ public class DFSOutputStream extends FSO
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
- currentPacket = new Packet(0, 0, bytesCurBlock);
+ currentPacket = new Packet(0, 0, bytesCurBlock,
+ currentSeqno++, this.checksum.getChecksumSize());
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
@@ -1540,7 +1537,7 @@ public class DFSOutputStream extends FSO
// but sync was requested.
// Send an empty packet
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
+ bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
}
} else {
// We already flushed up to this offset.
@@ -1557,7 +1554,7 @@ public class DFSOutputStream extends FSO
// and sync was requested.
// So send an empty sync packet.
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock);
+ bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
} else {
// just discard the current packet since it is already been sent.
currentPacket = null;
@@ -1738,7 +1735,8 @@ public class DFSOutputStream extends FSO
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
- currentPacket = new Packet(0, 0, bytesCurBlock);
+ currentPacket = new Packet(0, 0, bytesCurBlock,
+ currentSeqno++, this.checksum.getChecksumSize());
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}
@@ -1778,6 +1776,7 @@ public class DFSOutputStream extends FSO
DFSClient.LOG.info("Could not complete file " + src + " retrying...");
}
} catch (InterruptedException ie) {
+ DFSClient.LOG.warn("Caught exception ", ie);
}
}
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Oct 16 00:02:55 2012
@@ -75,6 +75,7 @@ import org.apache.hadoop.ipc.ProtobufRpc
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@@ -127,6 +128,43 @@ public class DFSUtil {
a.isDecommissioned() ? 1 : -1;
}
};
+
+
+ /**
+ * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
+ * Decommissioned/stale nodes are moved to the end of the array on sorting
+ * with this compartor.
+ */
+ @InterfaceAudience.Private
+ public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
+ private long staleInterval;
+
+ /**
+ * Constructor of DecomStaleComparator
+ *
+ * @param interval
+ * The time invertal for marking datanodes as stale is passed from
+ * outside, since the interval may be changed dynamically
+ */
+ public DecomStaleComparator(long interval) {
+ this.staleInterval = interval;
+ }
+
+ @Override
+ public int compare(DatanodeInfo a, DatanodeInfo b) {
+ // Decommissioned nodes will still be moved to the end of the list
+ if (a.isDecommissioned()) {
+ return b.isDecommissioned() ? 0 : 1;
+ } else if (b.isDecommissioned()) {
+ return -1;
+ }
+ // Stale nodes will be moved behind the normal nodes
+ boolean aStale = a.isStale(staleInterval);
+ boolean bStale = b.isStale(staleInterval);
+ return aStale == bStale ? 0 : (aStale ? 1 : -1);
+ }
+ }
+
/**
* Address matcher for matching an address to local address
*/
@@ -451,6 +489,34 @@ public class DFSUtil {
}
/**
+ * @return a collection of all configured NN Kerberos principals.
+ */
+ public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
+ Set<String> principals = new HashSet<String>();
+ for (String nsId : DFSUtil.getNameServiceIds(conf)) {
+ if (HAUtil.isHAEnabled(conf, nsId)) {
+ for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) {
+ Configuration confForNn = new Configuration(conf);
+ NameNode.initializeGenericKeys(confForNn, nsId, nnId);
+ String principal = SecurityUtil.getServerPrincipal(confForNn
+ .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+ NameNode.getAddress(confForNn).getHostName());
+ principals.add(principal);
+ }
+ } else {
+ Configuration confForNn = new Configuration(conf);
+ NameNode.initializeGenericKeys(confForNn, nsId, null);
+ String principal = SecurityUtil.getServerPrincipal(confForNn
+ .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+ NameNode.getAddress(confForNn).getHostName());
+ principals.add(principal);
+ }
+ }
+
+ return principals;
+ }
+
+ /**
* Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
* the configuration.
*
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Oct 16 00:02:55 2012
@@ -535,10 +535,10 @@ public class DistributedFileSystem exten
@Override
public void close() throws IOException {
try {
- super.processDeleteOnExit();
- dfs.close();
- } finally {
+ dfs.closeOutputStreams(false);
super.close();
+ } finally {
+ dfs.close();
}
}
@@ -624,6 +624,16 @@ public class DistributedFileSystem exten
public void saveNamespace() throws AccessControlException, IOException {
dfs.saveNamespace();
}
+
+ /**
+ * Rolls the edit log on the active NameNode.
+ * Requires super-user privileges.
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
+ * @return the transaction ID of the newly created segment
+ */
+ public long rollEdits() throws AccessControlException, IOException {
+ return dfs.rollEdits();
+ }
/**
* enable/disable/check restoreFaileStorage
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Tue Oct 16 00:02:55 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServicePro
import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -46,6 +47,7 @@ public class HDFSPolicyProvider extends
new Service("security.inter.datanode.protocol.acl",
InterDatanodeProtocol.class),
new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
+ new Service("security.qjournal.service.protocol.acl", QJournalProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
HAServiceProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Tue Oct 16 00:02:55 2012
@@ -105,4 +105,9 @@ public class HdfsConfiguration extends C
deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES);
deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID);
}
+
+ public static void main(String[] args) {
+ init();
+ Configuration.dumpDeprecatedKeys();
+ }
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Tue Oct 16 00:02:55 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -247,14 +248,16 @@ public class HftpFileSystem extends File
Credentials c;
try {
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
- } catch (Exception e) {
- LOG.info("Couldn't get a delegation token from " + nnHttpUrl +
- " using http.");
- if(LOG.isDebugEnabled()) {
- LOG.debug("error was ", e);
+ } catch (IOException e) {
+ if (e.getCause() instanceof ConnectException) {
+ LOG.warn("Couldn't connect to " + nnHttpUrl +
+ ", assuming security is disabled");
+ return null;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception getting delegation token", e);
}
- //Maybe the server is in unsecure mode (that's bad but okay)
- return null;
+ throw e;
}
for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
if(LOG.isDebugEnabled()) {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java Tue Oct 16 00:02:55 2012
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URL;
import java.security.KeyStore;
import java.security.cert.X509Certificate;
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Tue Oct 16 00:02:55 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.io.retry.Failov
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -68,7 +69,6 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
/**
* Create proxy objects to communicate with a remote NN. All remote access to an
@@ -243,106 +243,20 @@ public class NameNodeProxies {
return new NamenodeProtocolTranslatorPB(proxy);
}
- /**
- * Return the default retry policy used in RPC.
- *
- * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
- *
- * Otherwise, first unwrap ServiceException if possible, and then
- * (1) use multipleLinearRandomRetry for
- * - SafeModeException, or
- * - IOException other than RemoteException, or
- * - ServiceException; and
- * (2) use TRY_ONCE_THEN_FAIL for
- * - non-SafeMode RemoteException, or
- * - non-IOException.
- *
- * Note that dfs.client.retry.max < 0 is not allowed.
- */
- public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
- final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
- }
- if (multipleLinearRandomRetry == null) {
- //no retry
- return RetryPolicies.TRY_ONCE_THEN_FAIL;
- } else {
- return new RetryPolicy() {
- @Override
- public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isMethodIdempotent) throws Exception {
- if (e instanceof ServiceException) {
- //unwrap ServiceException
- final Throwable cause = e.getCause();
- if (cause != null && cause instanceof Exception) {
- e = (Exception)cause;
- }
- }
-
- //see (1) and (2) in the javadoc of this method.
- final RetryPolicy p;
- if (e instanceof RemoteException) {
- final RemoteException re = (RemoteException)e;
- p = SafeModeException.class.getName().equals(re.getClassName())?
- multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
- } else if (e instanceof IOException || e instanceof ServiceException) {
- p = multipleLinearRandomRetry;
- } else { //non-IOException
- p = RetryPolicies.TRY_ONCE_THEN_FAIL;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("RETRY " + retries + ") policy="
- + p.getClass().getSimpleName() + ", exception=" + e);
- }
- LOG.info("RETRY " + retries + ") policy="
- + p.getClass().getSimpleName() + ", exception=" + e);
- return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
- }
-
- @Override
- public String toString() {
- return "RetryPolicy[" + multipleLinearRandomRetry + ", "
- + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
- + "]";
- }
- };
- }
- }
-
- /**
- * Return the MultipleLinearRandomRetry policy specified in the conf,
- * or null if the feature is disabled.
- * If the policy is specified in the conf but the policy cannot be parsed,
- * the default policy is returned.
- *
- * Conf property: N pairs of sleep-time and number-of-retries
- * dfs.client.retry.policy = "s1,n1,s2,n2,..."
- */
- private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
- final boolean enabled = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
- if (!enabled) {
- return null;
- }
-
- final String policy = conf.get(
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
-
- final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
- return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
- }
-
private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
- final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
+ final RetryPolicy defaultPolicy =
+ RetryUtils.getDefaultRetryPolicy(
+ conf,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+ SafeModeException.class);
+
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Oct 16 00:02:55 2012
@@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -35,7 +34,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@@ -47,8 +45,6 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Tue Oct 16 00:02:55 2012
@@ -26,51 +26,131 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.io.IOException;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
/**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
*/
class SocketCache {
- static final Log LOG = LogFactory.getLog(SocketCache.class);
+ private static final Log LOG = LogFactory.getLog(SocketCache.class);
- private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
- private final int capacity;
+ @InterfaceAudience.Private
+ static class SocketAndStreams implements Closeable {
+ public final Socket sock;
+ public final IOStreamPair ioStreams;
+ long createTime;
+
+ public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+ this.sock = s;
+ this.ioStreams = ioStreams;
+ this.createTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void close() {
+ if (ioStreams != null) {
+ IOUtils.closeStream(ioStreams.in);
+ IOUtils.closeStream(ioStreams.out);
+ }
+ IOUtils.closeSocket(sock);
+ }
- /**
- * Create a SocketCache with the given capacity.
- * @param capacity Max cache size.
- */
- public SocketCache(int capacity) {
- multimap = LinkedListMultimap.create();
- this.capacity = capacity;
- if (capacity <= 0) {
- LOG.debug("SocketCache disabled in configuration.");
+ public long getCreateTime() {
+ return this.createTime;
}
}
+ private Daemon daemon;
+ /** A map for per user per datanode. */
+ private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
+ LinkedListMultimap.create();
+ private static int capacity;
+ private static long expiryPeriod;
+ private static SocketCache scInstance = new SocketCache();
+ private static boolean isInitedOnce = false;
+
+ public static synchronized SocketCache getInstance(int c, long e) {
+ // capacity is only initialized once
+ if (isInitedOnce == false) {
+ capacity = c;
+ expiryPeriod = e;
+
+ if (capacity == 0 ) {
+ LOG.info("SocketCache disabled.");
+ }
+ else if (expiryPeriod == 0) {
+ throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+ expiryPeriod + "when cache is enabled.");
+ }
+ isInitedOnce = true;
+ } else { //already initialized once
+ if (capacity != c || expiryPeriod != e) {
+ LOG.info("capacity and expiry periods already set to " + capacity +
+ " and " + expiryPeriod + " respectively. Cannot set it to " + c +
+ " and " + e);
+ }
+ }
+
+ return scInstance;
+ }
+
+ private boolean isDaemonStarted() {
+ return (daemon == null)? false: true;
+ }
+
+ private synchronized void startExpiryDaemon() {
+ // start daemon only if not already started
+ if (isDaemonStarted() == true) {
+ return;
+ }
+
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ SocketCache.this.run();
+ } catch(InterruptedException e) {
+ //noop
+ } finally {
+ SocketCache.this.clear();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(SocketCache.this);
+ }
+ });
+ daemon.start();
+ }
+
/**
* Get a cached socket to the given address.
* @param remote Remote address the socket is connected to.
* @return A socket with unknown state, possibly closed underneath. Or null.
*/
public synchronized SocketAndStreams get(SocketAddress remote) {
+
if (capacity <= 0) { // disabled
return null;
}
-
- List<SocketAndStreams> socklist = multimap.get(remote);
- if (socklist == null) {
+
+ List<SocketAndStreams> sockStreamList = multimap.get(remote);
+ if (sockStreamList == null) {
return null;
}
- Iterator<SocketAndStreams> iter = socklist.iterator();
+ Iterator<SocketAndStreams> iter = sockStreamList.iterator();
while (iter.hasNext()) {
SocketAndStreams candidate = iter.next();
iter.remove();
@@ -86,14 +166,16 @@ class SocketCache {
* @param sock socket not used by anyone.
*/
public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+
+ Preconditions.checkNotNull(sock);
SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
if (capacity <= 0) {
// Cache disabled.
s.close();
return;
}
-
- Preconditions.checkNotNull(sock);
+
+ startExpiryDaemon();
SocketAddress remoteAddr = sock.getRemoteSocketAddress();
if (remoteAddr == null) {
@@ -106,7 +188,7 @@ class SocketCache {
if (capacity == multimap.size()) {
evictOldest();
}
- multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
+ multimap.put(remoteAddr, s);
}
public synchronized int size() {
@@ -114,13 +196,34 @@ class SocketCache {
}
/**
+ * Evict and close sockets older than expiry period from the cache.
+ */
+ private synchronized void evictExpired(long expiryPeriod) {
+ while (multimap.size() != 0) {
+ Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+ multimap.entries().iterator();
+ Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+ // if oldest socket expired, remove it
+ if (entry == null ||
+ System.currentTimeMillis() - entry.getValue().getCreateTime() <
+ expiryPeriod) {
+ break;
+ }
+ iter.remove();
+ SocketAndStreams s = entry.getValue();
+ s.close();
+ }
+ }
+
+ /**
* Evict the oldest entry in the cache.
*/
private synchronized void evictOldest() {
Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
- throw new IllegalStateException("Cannot evict from empty cache!");
+ throw new IllegalStateException("Cannot evict from empty cache! " +
+ "capacity: " + capacity);
}
Entry<SocketAddress, SocketAndStreams> entry = iter.next();
iter.remove();
@@ -129,38 +232,31 @@ class SocketCache {
}
/**
- * Empty the cache, and close all sockets.
+ * Periodically check in the cache and expire the entries
+ * older than expiryPeriod minutes
*/
- public synchronized void clear() {
- for (SocketAndStreams s : multimap.values()) {
- s.close();
+ private void run() throws InterruptedException {
+ for(long lastExpiryTime = System.currentTimeMillis();
+ !Thread.interrupted();
+ Thread.sleep(expiryPeriod)) {
+ final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+ if (elapsed >= expiryPeriod) {
+ evictExpired(expiryPeriod);
+ lastExpiryTime = System.currentTimeMillis();
+ }
}
- multimap.clear();
- }
-
- @Override
- protected void finalize() {
clear();
+ throw new InterruptedException("Daemon Interrupted");
}
-
- @InterfaceAudience.Private
- static class SocketAndStreams implements Closeable {
- public final Socket sock;
- public final IOStreamPair ioStreams;
-
- public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
- this.sock = s;
- this.ioStreams = ioStreams;
- }
-
- @Override
- public void close() {
- if (ioStreams != null) {
- IOUtils.closeStream(ioStreams.in);
- IOUtils.closeStream(ioStreams.out);
- }
- IOUtils.closeSocket(sock);
+
+ /**
+ * Empty the cache, and close all sockets.
+ */
+ private synchronized void clear() {
+ for (SocketAndStreams sockAndStream : multimap.values()) {
+ sockAndStream.close();
}
+ multimap.clear();
}
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Oct 16 00:02:55 2012
@@ -667,6 +667,18 @@ public interface ClientProtocol {
*/
public void saveNamespace() throws AccessControlException, IOException;
+
+ /**
+ * Roll the edit log.
+ * Requires superuser privileges.
+ *
+ * @throws AccessControlException if the superuser privilege is violated
+ * @throws IOException if log roll fails
+ * @return the txid of the new segment
+ */
+ @Idempotent
+ public long rollEdits() throws AccessControlException, IOException;
+
/**
* Enable/Disable restore failed storage.
* <p>
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Oct 16 00:02:55 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetworkTopo
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
/**
* This class extends the primary identifier of a Datanode with ephemeral
@@ -321,7 +322,24 @@ public class DatanodeInfo extends Datano
}
return adminState;
}
-
+
+ /**
+ * Check if the datanode is in stale state. Here if
+ * the namenode has not received heartbeat msg from a
+ * datanode for more than staleInterval (default value is
+ * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}),
+ * the datanode will be treated as stale node.
+ *
+ * @param staleInterval
+ * the time interval for marking the node as stale. If the last
+ * update time is beyond the given time interval, the node will be
+ * marked as stale.
+ * @return true if the node is stale
+ */
+ public boolean isStale(long staleInterval) {
+ return (Time.now() - lastUpdate) >= staleInterval;
+ }
+
/**
* Sets the admin state of this node.
*/
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Tue Oct 16 00:02:55 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
@@ -155,6 +156,14 @@ public abstract class HdfsProtoUtil {
return ret;
}
+ public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
+ return DataChecksum.Type.valueOf(type.name());
+ }
+
+ public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
+ return HdfsProtos.ChecksumTypeProto.valueOf(type.name());
+ }
+
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();
@@ -167,4 +176,4 @@ public abstract class HdfsProtoUtil {
return new ExactSizeInputStream(input, size);
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Tue Oct 16 00:02:55 2012
@@ -24,9 +24,9 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -52,7 +52,7 @@ public abstract class DataTransferProtoU
}
public static ChecksumProto toProto(DataChecksum checksum) {
- ChecksumType type = ChecksumType.valueOf(checksum.getChecksumType().name());
+ ChecksumTypeProto type = ChecksumTypeProto.valueOf(checksum.getChecksumType().name());
if (type == null) {
throw new IllegalArgumentException(
"Can't convert checksum to protobuf: " + checksum);
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java Tue Oct 16 00:02:55 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import com.google.protobuf.TextFormat;
+
/** Pipeline Acknowledgment **/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -120,6 +122,6 @@ public class PipelineAck {
@Override //Object
public String toString() {
- return proto.toString();
+ return TextFormat.shortDebugString(proto);
}
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Tue Oct 16 00:02:55 2012
@@ -103,6 +103,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
@@ -537,6 +539,20 @@ public class ClientNamenodeProtocolServe
}
}
+
+ @Override
+ public RollEditsResponseProto rollEdits(RpcController controller,
+ RollEditsRequestProto request) throws ServiceException {
+ try {
+ long txid = server.rollEdits();
+ return RollEditsResponseProto.newBuilder()
+ .setNewSegmentTxId(txid)
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE =
RefreshNodesResponseProto.newBuilder().build();
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Tue Oct 16 00:02:55 2012
@@ -87,6 +87,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto;
@@ -525,6 +527,17 @@ public class ClientNamenodeProtocolTrans
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ @Override
+ public long rollEdits() throws AccessControlException, IOException {
+ RollEditsRequestProto req = RollEditsRequestProto.getDefaultInstance();
+ try {
+ RollEditsResponseProto resp = rpcProxy.rollEdits(null, req);
+ return resp.getNewSegmentTxId();
+ } catch (ServiceException se) {
+ throw ProtobufHelper.getRemoteException(se);
+ }
+ }
@Override
public boolean restoreFailedStorage(String arg)
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Oct 16 00:02:55 2012
@@ -326,12 +326,15 @@ public class PBHelper {
}
public static RemoteEditLogProto convert(RemoteEditLog log) {
- return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId())
- .setStartTxId(log.getStartTxId()).build();
+ return RemoteEditLogProto.newBuilder()
+ .setStartTxId(log.getStartTxId())
+ .setEndTxId(log.getEndTxId())
+ .setIsInProgress(log.isInProgress()).build();
}
public static RemoteEditLog convert(RemoteEditLogProto l) {
- return new RemoteEditLog(l.getStartTxId(), l.getEndTxId());
+ return new RemoteEditLog(l.getStartTxId(), l.getEndTxId(),
+ l.getIsInProgress());
}
public static RemoteEditLogManifestProto convert(
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Tue Oct 16 00:02:55 2012
@@ -118,7 +118,6 @@ public class BlockTokenIdentifier extend
return a == null ? b == null : a.equals(b);
}
- /** {@inheritDoc} */
@Override
public boolean equals(Object obj) {
if (obj == this) {
@@ -135,7 +134,6 @@ public class BlockTokenIdentifier extend
return false;
}
- /** {@inheritDoc} */
@Override
public int hashCode() {
return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Tue Oct 16 00:02:55 2012
@@ -19,12 +19,14 @@ package org.apache.hadoop.hdfs.server.bl
import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ContentSummary;
/**
* This interface is used by the block manager to expose a
* few characteristics of a collection of Block/BlockUnderConstruction.
*/
+@InterfaceAudience.Private
public interface BlockCollection {
/**
* Get the last block of the collection.
@@ -56,7 +58,7 @@ public interface BlockCollection {
* Get block replication for the collection
* @return block replication value
*/
- public short getReplication();
+ public short getBlockReplication();
/**
* Get the name of the collection.
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Tue Oct 16 00:02:55 2012
@@ -73,7 +73,7 @@ public class BlockInfo extends Block imp
* @param from BlockInfo to copy from.
*/
protected BlockInfo(BlockInfo from) {
- this(from, from.bc.getReplication());
+ this(from, from.bc.getBlockReplication());
this.bc = from.bc;
}
@@ -335,7 +335,7 @@ public class BlockInfo extends Block imp
BlockUCState s, DatanodeDescriptor[] targets) {
if(isComplete()) {
return new BlockInfoUnderConstruction(
- this, getBlockCollection().getReplication(), s, targets);
+ this, getBlockCollection().getBlockReplication(), s, targets);
}
// the block is already under construction
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Oct 16 00:02:55 2012
@@ -59,7 +59,6 @@ import static org.apache.hadoop.util.Exi
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -364,11 +363,10 @@ public class BlockManager {
replicationThread.join(3000);
}
} catch (InterruptedException ie) {
- } finally {
- if (pendingReplications != null) pendingReplications.stop();
- blocksMap.close();
- datanodeManager.close();
}
+ datanodeManager.close();
+ pendingReplications.stop();
+ blocksMap.close();
}
/** @return the datanodeManager */
@@ -999,7 +997,7 @@ public class BlockManager {
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
- if (countNodes(b.stored).liveReplicas() >= bc.getReplication()) {
+ if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
} else if (namesystem.isPopulatingReplQueues()) {
@@ -1137,7 +1135,7 @@ public class BlockManager {
continue;
}
- requiredReplication = bc.getReplication();
+ requiredReplication = bc.getBlockReplication();
// get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>();
@@ -1223,7 +1221,7 @@ public class BlockManager {
neededReplications.decrementReplicationIndex(priority);
continue;
}
- requiredReplication = bc.getReplication();
+ requiredReplication = bc.getBlockReplication();
// do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block);
@@ -1316,8 +1314,9 @@ public class BlockManager {
final HashMap<Node, Node> excludedNodes,
final long blocksize) throws IOException {
// choose targets for the new block to be allocated.
- final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
- src, numOfReplicas, client, excludedNodes, blocksize);
+ final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
+ numOfReplicas, client, new ArrayList<DatanodeDescriptor>(), false,
+ excludedNodes, blocksize);
if (targets.length < minReplication) {
throw new IOException("File " + src + " could only be replicated to "
+ targets.length + " nodes instead of minReplication (="
@@ -2090,7 +2089,7 @@ assert storedBlock.findDatanode(dn) < 0
}
// handle underReplication/overReplication
- short fileReplication = bc.getReplication();
+ short fileReplication = bc.getBlockReplication();
if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica,
num.decommissionedReplicas(), fileReplication);
@@ -2229,7 +2228,7 @@ assert storedBlock.findDatanode(dn) < 0
return MisReplicationResult.UNDER_CONSTRUCTION;
}
// calculate current replication
- short expectedReplication = bc.getReplication();
+ short expectedReplication = bc.getBlockReplication();
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
@@ -2700,7 +2699,7 @@ assert storedBlock.findDatanode(dn) < 0
while(it.hasNext()) {
final Block block = it.next();
BlockCollection bc = blocksMap.getBlockCollection(block);
- short expectedReplication = bc.getReplication();
+ short expectedReplication = bc.getBlockReplication();
NumberReplicas num = countNodes(block);
int numCurrentReplica = num.liveReplicas();
if (numCurrentReplica > expectedReplication) {
@@ -2846,7 +2845,7 @@ assert storedBlock.findDatanode(dn) < 0
if (bc == null) { // block does not belong to any file
return 0;
}
- return bc.getReplication();
+ return bc.getBlockReplication();
}