You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC
svn commit: r1495297 [17/46] - in /hadoop/common/branches/branch-1-win: ./
bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/
src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Modified: hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/AggregateWordHistogram.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/AggregateWordHistogram.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/AggregateWordHistogram.java (original)
+++ hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/AggregateWordHistogram.java Fri Jun 21 06:37:27 2013
@@ -72,8 +72,9 @@ public class AggregateWordHistogram {
*/
@SuppressWarnings("unchecked")
public static void main(String[] args) throws IOException {
- JobConf conf = ValueAggregatorJob.createValueAggregatorJob(args
- , new Class[] {AggregateWordHistogramPlugin.class});
+ JobConf conf = ValueAggregatorJob.createValueAggregatorJob(args,
+ new Class[] { AggregateWordHistogramPlugin.class },
+ AggregateWordHistogram.class);
JobClient.runJob(conf);
}
Modified: hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/WordCount.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/WordCount.java Fri Jun 21 06:37:27 2013
@@ -1,3 +1,18 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
package org.apache.hadoop.examples;
import java.io.IOException;
Modified: hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java (original)
+++ hadoop/common/branches/branch-1-win/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java Fri Jun 21 06:37:27 2013
@@ -162,17 +162,36 @@ public class DistributedPentomino extend
int height = 10;
Class<? extends Pentomino> pentClass;
if (args.length == 0) {
- System.out.println("pentomino <output>");
+ System.out
+ .println("Usage: pentomino <output> [-depth #] [-height #] [-width #]");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
conf = new JobConf(getConf());
+
+ // Pick up the parameters, should the user set these
width = conf.getInt("pent.width", width);
height = conf.getInt("pent.height", height);
depth = conf.getInt("pent.depth", depth);
pentClass = conf.getClass("pent.class", OneSidedPentomino.class, Pentomino.class);
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equalsIgnoreCase("-depth")) {
+ depth = Integer.parseInt(args[++i].trim());
+ } else if (args[i].equalsIgnoreCase("-height")) {
+ height = Integer.parseInt(args[++i].trim());
+ } else if (args[i].equalsIgnoreCase("-width")) {
+ width = Integer.parseInt(args[++i].trim());
+ }
+ }
+
+ // Set parameters for MR tasks to pick up either which way the user sets
+ // them or not
+ conf.setInt("pent.width", width);
+ conf.setInt("pent.height", height);
+ conf.setInt("pent.depth", depth);
+
Path output = new Path(args[0]);
Path input = new Path(output + "_input");
FileSystem fileSys = FileSystem.get(conf);
Modified: hadoop/common/branches/branch-1-win/src/examples/pipes/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/examples/pipes/Makefile.am?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/examples/pipes/Makefile.am (original)
+++ hadoop/common/branches/branch-1-win/src/examples/pipes/Makefile.am Fri Jun 21 06:37:27 2013
@@ -17,7 +17,7 @@ ACLOCAL_AMFLAGS = -I ../../c++/utils/m4
AM_CXXFLAGS=-Wall -I$(HADOOP_UTILS_PREFIX)/include \
-I$(HADOOP_PIPES_PREFIX)/include
LDADD=-L$(HADOOP_UTILS_PREFIX)/lib -L$(HADOOP_PIPES_PREFIX)/lib \
- -lhadooppipes -lhadooputils
+ -lhadooppipes -lhadooputils -lcrypto
bin_PROGRAMS= wordcount-simple wordcount-part wordcount-nopipe pipes-sort
Modified: hadoop/common/branches/branch-1-win/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/hdfs-default.xml?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/hdfs-default.xml (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/hdfs-default.xml Fri Jun 21 06:37:27 2013
@@ -38,7 +38,7 @@ creations/deletions), or "all".</descrip
<name>dfs.datanode.address</name>
<value>0.0.0.0:50010</value>
<description>
- The address where the datanode server will listen to.
+ The datanode server address and port for data transfer.
If the port is 0 then the server will start on a free port.
</description>
</property>
@@ -333,9 +333,24 @@ creations/deletions), or "all".</descrip
<description>
Specifies the percentage of blocks that should satisfy
the minimal replication requirement defined by dfs.replication.min.
- Values less than or equal to 0 mean not to start in safe mode.
+ Values less than or equal to 0 mean not to wait for any particular
+ percentage of blocks before exiting safemode.
Values greater than 1 will make safe mode permanent.
</description>
+ </property>
+
+<property>
+ <name>dfs.namenode.safemode.min.datanodes</name>
+ <value>0</value>
+ <description>
+ Specifies the number of datanodes that must be considered alive
+ before the name node exits safemode.
+ Values less than or equal to 0 mean not to take the number of live
+ datanodes into account when deciding whether to remain in safe mode
+ during startup.
+ Values greater than the number of datanodes in the cluster
+ will make safe mode permanent.
+ </description>
</property>
<property>
@@ -416,10 +431,10 @@ creations/deletions), or "all".</descrip
<property>
<name>dfs.support.append</name>
- <value>false</value>
- <description>Does HDFS allow appends to files?
- This is currently set to false because there are bugs in the
- "append code" and is not supported in any prodction cluster.
+ <description>
+ This option is no longer supported. HBase no longer requires that
+ this option be enabled as sync is now enabled by default. See
+ HADOOP-8230 for additional information.
</description>
</property>
@@ -464,6 +479,234 @@ creations/deletions), or "all".</descrip
</property>
<property>
+ <name>dfs.datanode.readahead.bytes</name>
+ <value>4193404</value>
+ <description>
+ While reading block files, if the Hadoop native libraries are available,
+ the datanode can use the posix_fadvise system call to explicitly
+ page data into the operating system buffer cache ahead of the current
+ reader's position. This can improve performance especially when
+ disks are highly contended.
+
+ This configuration specifies the number of bytes ahead of the current
+ read position which the datanode will attempt to read ahead. This
+ feature may be disabled by configuring this property to 0.
+
+ If the native libraries are not available, this configuration has no
+ effect.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.drop.cache.behind.reads</name>
+ <value>false</value>
+ <description>
+ In some workloads, the data read from HDFS is known to be significantly
+ large enough that it is unlikely to be useful to cache it in the
+ operating system buffer cache. In this case, the DataNode may be
+ configured to automatically purge all data from the buffer cache
+ after it is delivered to the client. This behavior is automatically
+ disabled for workloads which read only short sections of a block
+ (e.g HBase random-IO workloads).
+
+ This may improve performance for some workloads by freeing buffer
+ cache spage usage for more cacheable data.
+
+ If the Hadoop native libraries are not available, this configuration
+ has no effect.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.drop.cache.behind.writes</name>
+ <value>false</value>
+ <description>
+ In some workloads, the data written to HDFS is known to be significantly
+ large enough that it is unlikely to be useful to cache it in the
+ operating system buffer cache. In this case, the DataNode may be
+ configured to automatically purge all data from the buffer cache
+ after it is written to disk.
+
+ This may improve performance for some workloads by freeing buffer
+ cache spage usage for more cacheable data.
+
+ If the Hadoop native libraries are not available, this configuration
+ has no effect.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.sync.behind.writes</name>
+ <value>false</value>
+ <description>
+ If this configuration is enabled, the datanode will instruct the
+ operating system to enqueue all written data to the disk immediately
+ after it is written. This differs from the usual OS policy which
+ may wait for up to 30 seconds before triggering writeback.
+
+ This may improve performance for some workloads by smoothing the
+ IO profile for data written to disk.
+
+ If the Hadoop native libraries are not available, this configuration
+ has no effect.
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.use.datanode.hostname</name>
+ <value>false</value>
+ <description>Whether clients should use datanode hostnames when
+ connecting to datanodes.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.use.datanode.hostname</name>
+ <value>false</value>
+ <description>Whether datanodes should use datanode hostnames when
+ connecting to other datanodes for data transfer.
+ </description>
+</property>
+
+<property>
+ <name>dfs.client.local.interfaces</name>
+ <value></value>
+ <description>A comma separated list of network interface names to use
+ for data transfer between the client and datanodes. When creating
+ a connection to read from or write to a datanode, the client
+ chooses one of the specified interfaces at random and binds its
+ socket to the IP of that interface. Individual names may be
+ specified as either an interface name (eg "eth0"), a subinterface
+ name (eg "eth0:0"), or an IP address (which may be specified using
+ CIDR notation to match a range of IPs).
+ </description>
+</property>
+
+<property>
+ <name>dfs.image.transfer.bandwidthPerSec</name>
+ <value>0</value>
+ <description>
+ Specifies the maximum amount of bandwidth that can be utilized
+ for image transfer in term of the number of bytes per second.
+ A default value of 0 indicates that throttling is disabled.
+ </description>
+</property>
+
+<property>
+ <name>dfs.webhdfs.enabled</name>
+ <value>false</value>
+ <description>
+ Enable WebHDFS (REST API) in Namenodes and Datanodes.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.kerberos.internal.spnego.principal</name>
+ <value>${dfs.web.authentication.kerberos.principal}</value>
+</property>
+
+<property>
+ <name>dfs.secondary.namenode.kerberos.internal.spnego.principal</name>
+ <value>${dfs.web.authentication.kerberos.principal}</value>
+</property>
+
+<property>
+ <name>dfs.namenode.invalidate.work.pct.per.iteration</name>
+ <value>0.32f</value>
+ <description>
+ *Note*: Advanced property. Change with caution.
+ This determines the percentage amount of block
+ invalidations (deletes) to do over a single DN heartbeat
+ deletion command. The final deletion count is determined by applying this
+ percentage to the number of live nodes in the system.
+ The resultant number is the number of blocks from the deletion list
+ chosen for proper invalidation over a single heartbeat of a single DN.
+ Value should be a positive, non-zero percentage in float notation (X.Yf),
+ with 1.0f meaning 100%.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
+ <value>2</value>
+ <description>
+ *Note*: Advanced property. Change with caution.
+ This determines the total amount of block transfers to begin in
+ parallel at a DN, for replication, when such a command list is being
+ sent over a DN heartbeat by the NN. The actual number is obtained by
+ multiplying this multiplier with the total number of live nodes in the
+ cluster. The result number is the number of blocks to begin transfers
+ immediately for, per DN heartbeat. This number can be any positive,
+ non-zero integer.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.avoid.read.stale.datanode</name>
+ <value>false</value>
+ <description>
+ Indicate whether or not to avoid reading from "stale" datanodes whose
+ heartbeat messages have not been received by the namenode
+ for more than a specified time interval. Stale datanodes will be
+ moved to the end of the node list returned for reading. See
+ dfs.namenode.avoid.write.stale.datanode for a similar setting for writes.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.avoid.write.stale.datanode</name>
+ <value>false</value>
+ <description>
+ Indicate whether or not to avoid writing to "stale" datanodes whose
+ heartbeat messages have not been received by the namenode
+ for more than a specified time interval. Writes will avoid using
+ stale datanodes unless more than a configured ratio
+ (dfs.namenode.write.stale.datanode.ratio) of datanodes are marked as
+ stale. See dfs.namenode.avoid.read.stale.datanode for a similar setting
+ for reads.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.stale.datanode.interval</name>
+ <value>30000</value>
+ <description>
+ Default time interval for marking a datanode as "stale", i.e., if
+ the namenode has not received heartbeat msg from a datanode for
+ more than this time interval, the datanode will be marked and treated
+ as "stale" by default. The stale interval cannot be too small since
+ otherwise this may cause too frequent change of stale states.
+ We thus set a minimum stale interval value (the default value is 3 times
+ of heartbeat interval) and guarantee that the stale interval cannot be less
+ than the minimum value.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.write.stale.datanode.ratio</name>
+ <value>0.5f</value>
+ <description>
+ When the ratio of number stale datanodes to total datanodes marked
+ is greater than this ratio, stop avoiding writing to stale nodes so
+ as to prevent causing hotspots.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.plugins</name>
+ <value></value>
+ <description>Comma-separated list of datanode plug-ins to be activated.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.plugins</name>
+ <value></value>
+ <description>Comma-separated list of namenode plug-ins to be activated.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.edits.noeditlogchannelflush</name>
<value>false</value>
<description>
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/BlockReaderLocal.java Fri Jun 21 06:37:27 2013
@@ -84,11 +84,11 @@ class BlockReaderLocal extends FSInputCh
}
private synchronized ClientDatanodeProtocol getDatanodeProxy(
- DatanodeInfo node, Configuration conf, int socketTimeout)
- throws IOException {
+ DatanodeInfo node, Configuration conf, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
if (proxy == null) {
proxy = DFSClient.createClientDatanodeProtocolProxy(node, conf,
- socketTimeout);
+ socketTimeout, connectToDnViaHostname);
}
return proxy;
}
@@ -135,13 +135,14 @@ class BlockReaderLocal extends FSInputCh
*/
static BlockReaderLocal newBlockReader(Configuration conf,
String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
- int socketTimeout, long startOffset, long length) throws IOException {
+ int socketTimeout, long startOffset, long length, boolean connectToDnViaHostname)
+ throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
- pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+ pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token, connectToDnViaHostname);
}
// check to see if the file exists. It may so happen that the
@@ -216,11 +217,12 @@ class BlockReaderLocal extends FSInputCh
private static BlockLocalPathInfo getBlockPathInfo(Block blk,
DatanodeInfo node, Configuration conf, int timeout,
- Token<BlockTokenIdentifier> token) throws IOException {
+ Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
+ throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
- conf, timeout);
+ conf, timeout, connectToDnViaHostname);
try {
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java Fri Jun 21 06:37:27 2013
@@ -22,7 +22,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
@@ -54,20 +58,20 @@ public abstract class ByteRangeInputStre
return url;
}
- protected abstract HttpURLConnection openConnection() throws IOException;
-
- protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
+ /** Connect to server with a data offset. */
+ protected abstract HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException;
}
enum StreamStatus {
- NORMAL, SEEK
+ NORMAL, SEEK, CLOSED
}
protected InputStream in;
protected URLOpener originalURL;
protected URLOpener resolvedURL;
protected long startPos = 0;
protected long currentPos = 0;
- protected long filelength;
+ protected Long fileLength = null;
StreamStatus status = StreamStatus.SEEK;
@@ -82,72 +86,123 @@ public abstract class ByteRangeInputStre
this.resolvedURL = r;
}
- protected abstract void checkResponseCode(final HttpURLConnection connection
- ) throws IOException;
-
protected abstract URL getResolvedUrl(final HttpURLConnection connection
) throws IOException;
- private InputStream getInputStream() throws IOException {
- if (status != StreamStatus.NORMAL) {
-
- if (in != null) {
- in.close();
- in = null;
- }
-
- // Use the original url if no resolved url exists, eg. if
- // it's the first time a request is made.
- final URLOpener opener =
- (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
-
- final HttpURLConnection connection = opener.openConnection(startPos);
- connection.connect();
- checkResponseCode(connection);
-
+ protected InputStream getInputStream() throws IOException {
+ switch (status) {
+ case NORMAL:
+ break;
+ case SEEK:
+ if (in != null) {
+ in.close();
+ }
+ in = openInputStream();
+ status = StreamStatus.NORMAL;
+ break;
+ case CLOSED:
+ throw new IOException("Stream closed");
+ }
+ return in;
+ }
+
+ protected InputStream openInputStream() throws IOException {
+ // Use the original url if no resolved url exists, eg. if
+ // it's the first time a request is made.
+ final boolean resolved = resolvedURL.getURL() != null;
+ final URLOpener opener = resolved? resolvedURL: originalURL;
+
+ final HttpURLConnection connection = opener.connect(startPos, resolved);
+ resolvedURL.setURL(getResolvedUrl(connection));
+
+ InputStream in = connection.getInputStream();
+ final Map<String, List<String>> headers = connection.getHeaderFields();
+ if (isChunkedTransferEncoding(headers)) {
+ // file length is not known
+ fileLength = null;
+ } else {
+ // for non-chunked transfer-encoding, get content-length
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
- filelength = (cl == null) ? -1 : Long.parseLong(cl);
- in = connection.getInputStream();
+ if (cl == null) {
+ throw new IOException(StreamFile.CONTENT_LENGTH + " is missing: "
+ + headers);
+ }
+ final long streamlength = Long.parseLong(cl);
+ fileLength = startPos + streamlength;
- resolvedURL.setURL(getResolvedUrl(connection));
- status = StreamStatus.NORMAL;
+ // Java has a bug with >2GB request streams. It won't bounds check
+ // the reads so the transfer blocks until the server times out
+ in = new BoundedInputStream(in, streamlength);
}
-
+
return in;
}
- private void update(final boolean isEOF, final int n)
- throws IOException {
- if (!isEOF) {
+ private static boolean isChunkedTransferEncoding(
+ final Map<String, List<String>> headers) {
+ return contains(headers, "Transfer-Encoding", "chunked")
+ || contains(headers, "TE", "chunked");
+ }
+
+ /** Does the HTTP header map contain the given key, value pair? */
+ private static boolean contains(final Map<String, List<String>> headers,
+ final String key, final String value) {
+ final List<String> values = headers.get(key);
+ if (values != null) {
+ for(String v : values) {
+ for(final StringTokenizer t = new StringTokenizer(v, ",");
+ t.hasMoreTokens(); ) {
+ if (value.equalsIgnoreCase(t.nextToken())) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private int update(final int n) throws IOException {
+ if (n != -1) {
currentPos += n;
- } else if (currentPos < filelength) {
+ } else if (fileLength != null && currentPos < fileLength) {
throw new IOException("Got EOF but currentPos = " + currentPos
- + " < filelength = " + filelength);
+ + " < filelength = " + fileLength);
}
+ return n;
}
+ @Override
public int read() throws IOException {
final int b = getInputStream().read();
- update(b == -1, 1);
+ update((b == -1)? -1 : 1);
return b;
}
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException{
+ return update(getInputStream().read(b, off, len));
+ }
+
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
* seek past the end of the file.
*/
+ @Override
public void seek(long pos) throws IOException {
if (pos != currentPos) {
startPos = pos;
currentPos = pos;
- status = StreamStatus.SEEK;
+ if (status != StreamStatus.CLOSED) {
+ status = StreamStatus.SEEK;
+ }
}
}
/**
* Return the current offset from the start of the file
*/
+ @Override
public long getPos() throws IOException {
return currentPos;
}
@@ -156,7 +211,17 @@ public abstract class ByteRangeInputStre
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*/
+ @Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
-}
\ No newline at end of file
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ status = StreamStatus.CLOSED;
+ }
+}
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Jun 21 06:37:27 2013
@@ -21,10 +21,12 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.*;
import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.*;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -59,6 +62,8 @@ import java.nio.ByteBuffer;
import javax.net.SocketFactory;
+import sun.net.util.IPAddressUtil;
+
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
@@ -75,13 +80,12 @@ public class DFSClient implements FSCons
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
public final ClientProtocol namenode;
- private final ClientProtocol rpcNamenode;
+ final ClientProtocol rpcNamenode;
private final InetSocketAddress nnAddress;
final UserGroupInformation ugi;
volatile boolean clientRunning = true;
- Random r = new Random();
+ static Random r = new Random();
final String clientName;
- final LeaseChecker leasechecker = new LeaseChecker();
private Configuration conf;
private long defaultBlockSize;
private short defaultReplication;
@@ -93,6 +97,8 @@ public class DFSClient implements FSCons
private final FileSystem.Statistics stats;
private int maxBlockAcquireFailures;
private boolean shortCircuitLocalReads;
+ private boolean connectToDnViaHostname;
+ private SocketAddress[] localInterfaceAddrs;
/**
* We assume we're talking to another CDH server, which supports
@@ -101,7 +107,18 @@ public class DFSClient implements FSCons
*/
private volatile boolean serverSupportsHdfs630 = true;
private volatile boolean serverSupportsHdfs200 = true;
-
+ final int hdfsTimeout; // timeout value for a DFS operation.
+ private final String authority;
+
+ /**
+ * A map from file names to {@link DFSOutputStream} objects
+ * that are currently being written by this client.
+ * Note that a file can only be written by a single client.
+ */
+ private final Map<String, DFSOutputStream> filesBeingWritten
+ = new HashMap<String, DFSOutputStream>();
+
+ /** Create a {@link NameNode} proxy */
public static ClientProtocol createNamenode(Configuration conf) throws IOException {
return createNamenode(NameNode.getAddress(conf), conf);
}
@@ -109,7 +126,7 @@ public class DFSClient implements FSCons
public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
return createNamenode(createRPCNamenode(nameNodeAddr, conf,
- UserGroupInformation.getCurrentUser()));
+ UserGroupInformation.getCurrentUser()), conf);
}
private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
@@ -117,11 +134,32 @@ public class DFSClient implements FSCons
throws IOException {
return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
ClientProtocol.versionID, nameNodeAddr, ugi, conf,
- NetUtils.getSocketFactory(conf, ClientProtocol.class));
- }
+ NetUtils.getSocketFactory(conf, ClientProtocol.class), 0,
+ RetryUtils.getMultipleLinearRandomRetry(
+ conf,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT
+ ),
+ false);
+ }
- private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
- throws IOException {
+ private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
+ Configuration conf) throws IOException {
+ //default policy
+ @SuppressWarnings("unchecked")
+ final RetryPolicy defaultPolicy =
+ RetryUtils.getDefaultRetryPolicy(
+ conf,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+ SafeModeException.class
+ );
+
+ //create policy
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
@@ -133,23 +171,27 @@ public class DFSClient implements FSCons
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(RemoteException.class,
RetryPolicies.retryByRemoteException(
- RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
+ defaultPolicy, remoteExceptionToPolicyMap));
RetryPolicy methodPolicy = RetryPolicies.retryByException(
- RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+ defaultPolicy, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("create", methodPolicy);
- return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
- rpcNamenode, methodNameToPolicyMap);
+ final ClientProtocol cp = (ClientProtocol) RetryProxy.create(ClientProtocol.class,
+ rpcNamenode, defaultPolicy, methodNameToPolicyMap);
+ RPC.checkVersion(ClientProtocol.class, ClientProtocol.versionID, cp);
+ return cp;
}
/** Create {@link ClientDatanodeProtocol} proxy with block/token */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
- DatanodeID datanodeid, Configuration conf,
- Block block, Token<BlockTokenIdentifier> token, int socketTimeout) throws IOException {
- InetSocketAddress addr = NetUtils.makeSocketAddr(
- datanodeid.getHost(), datanodeid.getIpcPort());
+ DatanodeInfo di, Configuration conf,
+ Block block, Token<BlockTokenIdentifier> token, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
+ final String dnName = di.getNameWithIpcPort(connectToDnViaHostname);
+ LOG.debug("Connecting to " + dnName);
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
}
@@ -163,10 +205,11 @@ public class DFSClient implements FSCons
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
- DatanodeID datanodeid, Configuration conf, int socketTimeout)
- throws IOException {
- InetSocketAddress addr = NetUtils.createSocketAddr(
- datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+ DatanodeInfo di, Configuration conf, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
+ final String dnName = di.getNameWithIpcPort(connectToDnViaHostname);
+ LOG.debug("Connecting to " + dnName);
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
}
@@ -222,20 +265,20 @@ public class DFSClient implements FSCons
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
+ this.hdfsTimeout = Client.getTimeout(conf);
ugi = UserGroupInformation.getCurrentUser();
+ this.authority = nameNodeAddr == null? "null":
+ nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
+ String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
+ this.clientName = "DFSClient_" + taskId + "_" +
+ r.nextInt() + "_" + Thread.currentThread().getId();
- String taskId = conf.get("mapred.task.id");
- if (taskId != null) {
- this.clientName = "DFSClient_" + taskId;
- } else {
- this.clientName = "DFSClient_" + r.nextInt();
- }
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
- this.namenode = createNamenode(this.rpcNamenode);
+ this.namenode = createNamenode(this.rpcNamenode, conf);
} else if (nameNodeAddr == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = this.rpcNamenode = rpcNamenode;
@@ -251,6 +294,23 @@ public class DFSClient implements FSCons
if (LOG.isDebugEnabled()) {
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
}
+ this.connectToDnViaHostname = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
+ DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
+ }
+ String localInterfaces[] =
+ conf.getStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
+ if (null == localInterfaces) {
+ localInterfaces = new String[0];
+ }
+ this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
+ if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
+ LOG.debug("Using local interfaces [" +
+ StringUtils.join(",",localInterfaces)+ "] with addresses [" +
+ StringUtils.join(",",localInterfaceAddrs) + "]");
+ }
}
static int getMaxBlockAcquireFailures(Configuration conf) {
@@ -264,20 +324,116 @@ public class DFSClient implements FSCons
throw result;
}
}
+
+ /** Return the lease renewer instance. The renewer thread won't start
+ * until the first output stream is created. The same instance will
+ * be returned until all output streams are closed.
+ */
+ public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
+ return LeaseRenewer.getInstance(authority, ugi, this);
+ }
+
+ /** Get a lease and start automatic renewal */
+ private void beginFileLease(final String src, final DFSOutputStream out)
+ throws IOException {
+ getLeaseRenewer().put(src, out, this);
+ }
+
+ /** Stop renewal of lease for the file. */
+ void endFileLease(final String src) throws IOException {
+ getLeaseRenewer().closeFile(src, this);
+ }
+
+ /** Put a file. Only called from LeaseRenewer, where proper locking is
+ * enforced to consistently update its local dfsclients array and
+ * client's filesBeingWritten map.
+ */
+ void putFileBeingWritten(final String src, final DFSOutputStream out) {
+ synchronized(filesBeingWritten) {
+ filesBeingWritten.put(src, out);
+ }
+ }
+
+ /** Remove a file. Only called from LeaseRenewer. */
+ void removeFileBeingWritten(final String src) {
+ synchronized(filesBeingWritten) {
+ filesBeingWritten.remove(src);
+ }
+ }
+
+ /** Is file-being-written map empty? */
+ boolean isFilesBeingWrittenEmpty() {
+ synchronized(filesBeingWritten) {
+ return filesBeingWritten.isEmpty();
+ }
+ }
+
+ /**
+ * Renew leases.
+ * @return true if lease was renewed. May return false if this
+ * client has been closed or has no files open.
+ **/
+ boolean renewLease() throws IOException {
+ if (clientRunning && !isFilesBeingWrittenEmpty()) {
+ namenode.renewLease(clientName);
+ return true;
+ }
+ return false;
+ }
+
+ /** Abort and release resources held. Ignore all errors. */
+ void abort() {
+ clientRunning = false;
+ closeAllFilesBeingWritten(true);
+
+ try {
+ // remove reference to this client and stop the renewer,
+ // if there is no more clients under the renewer.
+ getLeaseRenewer().closeClient(this);
+ } catch (IOException ioe) {
+ LOG.info("Exception occurred while aborting the client. " + ioe);
+ }
+ RPC.stopProxy(rpcNamenode); // close connections to the namenode
+ }
+
+ /** Close/abort all files being written. */
+ private void closeAllFilesBeingWritten(final boolean abort) {
+ for(;;) {
+ final String src;
+ final DFSOutputStream out;
+ synchronized(filesBeingWritten) {
+ if (filesBeingWritten.isEmpty()) {
+ return;
+ }
+ src = filesBeingWritten.keySet().iterator().next();
+ out = filesBeingWritten.remove(src);
+ }
+ if (out != null) {
+ try {
+ if (abort) {
+ out.abort();
+ } else {
+ out.close();
+ }
+ } catch(IOException ie) {
+ LOG.error("Failed to " + (abort? "abort": "close") + " file " + src,
+ ie);
+ }
+ }
+ }
+ }
+
/**
* Close the file system, abandoning all of the leases and files being
* created and close connections to the namenode.
*/
public synchronized void close() throws IOException {
if(clientRunning) {
- leasechecker.close();
+ closeAllFilesBeingWritten(false);
clientRunning = false;
- try {
- leasechecker.interruptAndJoin();
- } catch (InterruptedException ie) {
- }
-
+
+ getLeaseRenewer().closeClient(this);
// close connections to the namenode
RPC.stopProxy(rpcNamenode);
}
@@ -349,14 +505,14 @@ public class DFSClient implements FSCons
/**
* Get {@link BlockReader} for short circuited local reads.
*/
- private static BlockReader getLocalBlockReader(Configuration conf,
+ private BlockReader getLocalBlockReader(Configuration conf,
String src, Block blk, Token<BlockTokenIdentifier> accessToken,
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
throws InvalidToken, IOException {
try {
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- - offsetIntoBlock);
+ - offsetIntoBlock, connectToDnViaHostname);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
@@ -711,10 +867,10 @@ public class DFSClient implements FSCons
}
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
LOG.debug(src + ": masked=" + masked);
- OutputStream result = new DFSOutputStream(src, masked,
+ final DFSOutputStream result = new DFSOutputStream(src, masked,
overwrite, createParent, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
- leasechecker.put(src, result);
+ beginFileLease(src, result);
return result;
}
@@ -736,6 +892,20 @@ public class DFSClient implements FSCons
}
/**
+ * Close status of a file
+ * @return true if file is already closed
+ */
+ public boolean isFileClosed(String src) throws IOException{
+ checkOpen();
+ try {
+ return namenode.isFileClosed(src);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class,
+ FileNotFoundException.class);
+ }
+ }
+
+ /**
* Append to an existing HDFS file.
*
* @param src file name
@@ -769,7 +939,7 @@ public class DFSClient implements FSCons
}
final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress,
lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
- leasechecker.put(src, result);
+ beginFileLease(src, result);
return result;
}
@@ -792,6 +962,19 @@ public class DFSClient implements FSCons
DSQuotaExceededException.class);
}
}
+
+ /**
+ * Move blocks from src to trg and delete src
+ * See {@link ClientProtocol#concat(String, String [])}.
+ */
+ public void concat(String trg, String [] srcs) throws IOException {
+ checkOpen();
+ try {
+ namenode.concat(trg, srcs);
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class);
+ }
+ }
/**
* Rename file or directory.
@@ -880,6 +1063,60 @@ public class DFSClient implements FSCons
}
/**
+ * Return the socket addresses to use with each configured
+ * local interface. Local interfaces may be specified by IP
+ * address, IP address range using CIDR notation, interface
+ * name (e.g. eth0) or sub-interface name (e.g. eth0:0).
+ * The socket addresses consist of the IPs for the interfaces
+ * and the ephemeral port (port 0). If an IP, IP range, or
+ * interface name matches an interface with sub-interfaces
+ * only the IP of the interface is used. Sub-interfaces can
+ * be used by specifying them explicitly (by IP or name).
+ *
+ * @return SocketAddresses for the configured local interfaces,
+ * or an empty array if none are configured
+ * @throws UnknownHostException if a given interface name is invalid
+ */
+ private static SocketAddress[] getLocalInterfaceAddrs(
+ String interfaceNames[]) throws UnknownHostException {
+ List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
+ for (String interfaceName : interfaceNames) {
+ if (IPAddressUtil.isIPv4LiteralAddress(interfaceName)) {
+ localAddrs.add(new InetSocketAddress(interfaceName, 0));
+ } else if (NetUtils.isValidSubnet(interfaceName)) {
+ for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
+ localAddrs.add(new InetSocketAddress(addr, 0));
+ }
+ } else {
+ for (String ip : DNS.getIPs(interfaceName, false)) {
+ localAddrs.add(new InetSocketAddress(ip, 0));
+ }
+ }
+ }
+ return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
+ }
+
+ /**
+ * Select one of the configured local interfaces at random. We use a random
+ * interface because other policies like round-robin are less effective
+ * given that we cache connections to datanodes.
+ *
+ * @return one of the local interface addresses at random, or null if no
+ * local interfaces are configured
+ */
+ private SocketAddress getRandomLocalInterfaceAddr() {
+ if (localInterfaceAddrs.length == 0) {
+ return null;
+ }
+ final int idx = r.nextInt(localInterfaceAddrs.length);
+ final SocketAddress addr = localInterfaceAddrs[idx];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using local interface " + addr);
+ }
+ return addr;
+ }
+
+ /**
* Get the checksum of a file.
* @param src The file path
* @return The checksum
@@ -887,7 +1124,7 @@ public class DFSClient implements FSCons
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
- return getFileChecksum(src, namenode, socketFactory, socketTimeout);
+ return getFileChecksum(src, namenode, socketFactory, socketTimeout, connectToDnViaHostname);
}
/**
@@ -898,6 +1135,12 @@ public class DFSClient implements FSCons
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
) throws IOException {
+ return getFileChecksum(src, namenode, socketFactory, socketTimeout, false);
+ }
+
+ private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
+ ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
//get all block locations
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
if (null == blockLocations) {
@@ -930,6 +1173,7 @@ public class DFSClient implements FSCons
boolean done = false;
for(int j = 0; !done && j < datanodes.length; j++) {
+ final String dnName = datanodes[j].getName(connectToDnViaHostname);
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
@@ -937,17 +1181,17 @@ public class DFSClient implements FSCons
try {
//connect to a datanode
sock = socketFactory.createSocket();
- NetUtils.connect(sock,
- NetUtils.createSocketAddr(datanodes[j].getName()), timeout);
+ LOG.debug("Connecting to " + dnName);
+ NetUtils.connect(sock, NetUtils.createSocketAddr(dnName), timeout);
sock.setSoTimeout(timeout);
out = new DataOutputStream(
- new BufferedOutputStream(NetUtils.getOutputStream(sock),
+ new BufferedOutputStream(NetUtils.getOutputStream(sock),
DataNode.SMALL_BUFFER_SIZE));
in = new DataInputStream(NetUtils.getInputStream(sock));
if (LOG.isDebugEnabled()) {
- LOG.debug("write to " + datanodes[j].getName() + ": "
+ LOG.debug("write to " + dnName + ": "
+ DataTransferProtocol.OP_BLOCK_CHECKSUM + ", block=" + block);
}
@@ -966,7 +1210,7 @@ public class DFSClient implements FSCons
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+ "for file " + src + " for block " + block
- + " from datanode " + datanodes[j].getName()
+ + " from datanode " + dnName
+ ". Will retry the block once.");
}
lastRetriedIndex = i;
@@ -976,7 +1220,7 @@ public class DFSClient implements FSCons
break;
} else {
throw new IOException("Bad response " + reply + " for block "
- + block + " from datanode " + datanodes[j].getName());
+ + block + " from datanode " + dnName);
}
}
@@ -1007,12 +1251,10 @@ public class DFSClient implements FSCons
LOG.debug("set bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock);
}
- LOG.debug("got reply from " + datanodes[j].getName()
- + ": md5=" + md5);
+ LOG.debug("got reply from " + dnName + ": md5=" + md5);
}
} catch (IOException ie) {
- LOG.warn("src=" + src + ", datanodes[" + j + "].getName()="
- + datanodes[j].getName(), ie);
+ LOG.warn("src=" + src + ", datanodes[" + j + "]=" + dnName, ie);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
@@ -1287,118 +1529,7 @@ public class DFSClient implements FSCons
throw new IOException("No live nodes contain current block");
}
- boolean isLeaseCheckerStarted() {
- return leasechecker.daemon != null;
- }
-
- /** Lease management*/
- class LeaseChecker implements Runnable {
- /** A map from src -> DFSOutputStream of files that are currently being
- * written by this client.
- */
- private final SortedMap<String, OutputStream> pendingCreates
- = new TreeMap<String, OutputStream>();
-
- private Daemon daemon = null;
-
- synchronized void put(String src, OutputStream out) {
- if (clientRunning) {
- if (daemon == null) {
- daemon = new Daemon(this);
- daemon.start();
- }
- pendingCreates.put(src, out);
- }
- }
-
- synchronized void remove(String src) {
- pendingCreates.remove(src);
- }
-
- void interruptAndJoin() throws InterruptedException {
- Daemon daemonCopy = null;
- synchronized (this) {
- if (daemon != null) {
- daemon.interrupt();
- daemonCopy = daemon;
- }
- }
-
- if (daemonCopy != null) {
- LOG.debug("Wait for lease checker to terminate");
- daemonCopy.join();
- }
- }
-
- void close() {
- while (true) {
- String src;
- OutputStream out;
- synchronized (this) {
- if (pendingCreates.isEmpty()) {
- return;
- }
- src = pendingCreates.firstKey();
- out = pendingCreates.remove(src);
- }
- if (out != null) {
- try {
- out.close();
- } catch (IOException ie) {
- LOG.error("Exception closing file " + src+ " : " + ie, ie);
- }
- }
- }
- }
-
- private void renew() throws IOException {
- synchronized(this) {
- if (pendingCreates.isEmpty()) {
- return;
- }
- }
- namenode.renewLease(clientName);
- }
-
- /**
- * Periodically check in with the namenode and renew all the leases
- * when the lease period is half over.
- */
- public void run() {
- long lastRenewed = 0;
- while (clientRunning && !Thread.interrupted()) {
- if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
- try {
- renew();
- lastRenewed = System.currentTimeMillis();
- } catch (IOException ie) {
- LOG.warn("Problem renewing lease for " + clientName, ie);
- }
- }
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(this + " is interrupted.", ie);
- }
- return;
- }
- }
- }
-
- /** {@inheritDoc} */
- public String toString() {
- String s = getClass().getSimpleName();
- if (LOG.isTraceEnabled()) {
- return s + "@" + DFSClient.this + ": "
- + StringUtils.stringifyException(new Throwable("for testing"));
- }
- return s;
- }
- }
-
- /** Utility class to encapsulate data node info and its ip address. */
+ /** Utility class to encapsulate data node info and its address. */
private static class DNAddrPair {
DatanodeInfo info;
InetSocketAddress addr;
@@ -1840,79 +1971,136 @@ public class DFSClient implements FSCons
* Grab the open-file info from namenode
*/
synchronized void openInfo() throws IOException {
- LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
+ for (int retries = 3; retries > 0; retries--) {
+ if (fetchLocatedBlocks()) {
+ // fetch block success
+ return;
+ } else {
+ // Last block location unavailable. When a cluster restarts,
+ // DNs may not report immediately. At this time partial block
+ // locations will not be available with NN for getting the length.
+ // Lets retry a few times to get the length.
+ DFSClient.LOG.warn("Last block locations unavailable. "
+ + "Datanodes might not have reported blocks completely."
+ + " Will retry for " + retries + " times");
+ waitFor(4000);
+ }
+ }
+ throw new IOException("Could not obtain the last block locations.");
+ }
+
+ private void waitFor(int waitTime) throws InterruptedIOException {
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(
+ "Interrupted while getting the last block length.");
+ }
+ }
+
+ private boolean fetchLocatedBlocks() throws IOException,
+ FileNotFoundException {
+ LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0,
+ prefetchSize);
if (newInfo == null) {
throw new FileNotFoundException("File does not exist: " + src);
}
- // I think this check is not correct. A file could have been appended to
- // between two calls to openInfo().
- if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
- !newInfo.isUnderConstruction()) {
- Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
+ if (locatedBlocks != null && !locatedBlocks.isUnderConstruction()
+ && !newInfo.isUnderConstruction()) {
+ Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks()
+ .iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
- if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
+ if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
- updateBlockInfo(newInfo);
+ boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
this.locatedBlocks = newInfo;
this.currentNode = null;
+ return isBlkInfoUpdated;
}
/**
* For files under construction, update the last block size based
* on the length of the block from the datanode.
*/
- private void updateBlockInfo(LocatedBlocks newInfo) {
+ private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException {
if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction()
|| !(newInfo.locatedBlockCount() > 0)) {
- return;
+ return true;
}
LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1);
boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo
.getFileLength());
- if (!lastBlockInFile || last.getLocations().length <= 0) {
- return;
+ if (!lastBlockInFile) {
+ return true;
+ }
+
+ if (last.getLocations().length == 0) {
+ return false;
}
+
ClientDatanodeProtocol primary = null;
- DatanodeInfo primaryNode = last.getLocations()[0];
- try {
- primary = createClientDatanodeProtocolProxy(primaryNode, conf,
- last.getBlock(), last.getBlockToken(), socketTimeout);
- Block newBlock = primary.getBlockInfo(last.getBlock());
- long newBlockSize = newBlock.getNumBytes();
- long delta = newBlockSize - last.getBlockSize();
- // if the size of the block on the datanode is different
- // from what the NN knows about, the datanode wins!
- last.getBlock().setNumBytes(newBlockSize);
- long newlength = newInfo.getFileLength() + delta;
- newInfo.setFileLength(newlength);
- LOG.debug("DFSClient setting last block " + last + " to length "
- + newBlockSize + " filesize is now " + newInfo.getFileLength());
- } catch (IOException e) {
- if (e.getMessage().startsWith(
- "java.io.IOException: java.lang.NoSuchMethodException: "
- + "org.apache.hadoop.hdfs.protocol"
- + ".ClientDatanodeProtocol.getBlockInfo")) {
- // We're talking to a server that doesn't implement HDFS-200.
- serverSupportsHdfs200 = false;
- } else {
- LOG.debug("DFSClient file " + src
- + " is being concurrently append to" + " but datanode "
- + primaryNode.getHostName() + " probably does not have block "
- + last.getBlock());
+ Block newBlock = null;
+ for (int i = 0; i < last.getLocations().length && newBlock == null; i++) {
+ DatanodeInfo datanode = last.getLocations()[i];
+ try {
+ primary = createClientDatanodeProtocolProxy(datanode, conf, last
+ .getBlock(), last.getBlockToken(), socketTimeout,
+ connectToDnViaHostname);
+ newBlock = primary.getBlockInfo(last.getBlock());
+ } catch (IOException e) {
+ if (e.getMessage().startsWith(
+ "java.io.IOException: java.lang.NoSuchMethodException: "
+ + "org.apache.hadoop.hdfs.protocol"
+ + ".ClientDatanodeProtocol.getBlockInfo")) {
+ // We're talking to a server that doesn't implement HDFS-200.
+ serverSupportsHdfs200 = false;
+ } else {
+ LOG.info("Failed to get block info from "
+ + datanode.getHostName() + " probably does not have "
+ + last.getBlock(), e);
+ }
+ } finally {
+ if (primary != null) {
+ RPC.stopProxy(primary);
+ }
}
}
+
+ if (newBlock == null) {
+ if (!serverSupportsHdfs200) {
+ return true;
+ }
+ throw new IOException(
+ "Failed to get block info from any of the DN in pipeline: "
+ + Arrays.toString(last.getLocations()));
+ }
+
+ long newBlockSize = newBlock.getNumBytes();
+ long delta = newBlockSize - last.getBlockSize();
+ // if the size of the block on the datanode is different
+ // from what the NN knows about, the datanode wins!
+ last.getBlock().setNumBytes(newBlockSize);
+ long newlength = newInfo.getFileLength() + delta;
+ newInfo.setFileLength(newlength);
+ LOG.debug("DFSClient setting last block " + last + " to length "
+ + newBlockSize + " filesize is now " + newInfo.getFileLength());
+ return true;
}
public synchronized long getFileLength() {
return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
}
+ private synchronized boolean blockUnderConstruction() {
+ return locatedBlocks.isUnderConstruction();
+ }
+
/**
* Returns the datanode from which the stream is currently reading.
*/
@@ -2024,10 +2212,9 @@ public class DFSClient implements FSCons
private boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
throws IOException {
- if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
- return true;
- }
- return false;
+ // Can't local read a block under construction, see HDFS-2757
+ return shortCircuitLocalReads && !blockUnderConstruction()
+ && isLocalAddress(targetAddr);
}
/**
@@ -2086,7 +2273,7 @@ public class DFSClient implements FSCons
fetchBlockAt(target);
continue;
} else {
- LOG.info("Failed to read block " + targetBlock.getBlock()
+ LOG.info("Failed to read " + targetBlock.getBlock()
+ " on local machine" + StringUtils.stringifyException(ex));
LOG.info("Try reading via the datanode on " + targetAddr);
}
@@ -2095,7 +2282,9 @@ public class DFSClient implements FSCons
try {
s = socketFactory.createSocket();
- NetUtils.connect(s, targetAddr, socketTimeout);
+ LOG.debug("Connecting to " + targetAddr);
+ NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(),
+ socketTimeout);
s.setSoTimeout(socketTimeout);
blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
accessToken,
@@ -2225,7 +2414,7 @@ public class DFSClient implements FSCons
if (pos > blockEnd) {
currentNode = blockSeekTo(pos);
}
- int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+ int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
int result = readBuffer(buf, off, realLen);
if (result >= 0) {
@@ -2262,8 +2451,8 @@ public class DFSClient implements FSCons
DatanodeInfo[] nodes = block.getLocations();
try {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
- InetSocketAddress targetAddr =
- NetUtils.createSocketAddr(chosenNode.getName());
+ InetSocketAddress targetAddr =
+ NetUtils.createSocketAddr(chosenNode.getName(connectToDnViaHostname));
return new DNAddrPair(chosenNode, targetAddr);
} catch (IOException ie) {
String blockInfo = block.getBlock() + " file=" + src;
@@ -2272,9 +2461,9 @@ public class DFSClient implements FSCons
}
if (nodes == null || nodes.length == 0) {
- LOG.info("No node available for block: " + blockInfo);
+ LOG.info("No node available for: " + blockInfo);
}
- LOG.info("Could not obtain block " + block.getBlock()
+ LOG.info("Could not obtain " + block.getBlock()
+ " from any node: " + ie
+ ". Will get new block locations from namenode and retry...");
try {
@@ -2326,7 +2515,9 @@ public class DFSClient implements FSCons
} else {
// go to the datanode
dn = socketFactory.createSocket();
- NetUtils.connect(dn, targetAddr, socketTimeout);
+ LOG.debug("Connecting to " + targetAddr);
+ NetUtils.connect(dn, targetAddr, getRandomLocalInterfaceAddr(),
+ socketTimeout);
dn.setSoTimeout(socketTimeout);
reader = RemoteBlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(), accessToken,
@@ -3046,12 +3237,12 @@ public class DFSClient implements FSCons
return false;
}
if (response != null) {
- LOG.info("Error Recovery for block " + block +
+ LOG.info("Error Recovery for " + block +
" waiting for responder to exit. ");
return true;
}
if (errorIndex >= 0) {
- LOG.warn("Error Recovery for block " + block
+ LOG.warn("Error Recovery for " + block
+ " bad datanode[" + errorIndex + "] "
+ (nodes == null? "nodes == null": nodes[errorIndex].getName()));
}
@@ -3124,7 +3315,7 @@ public class DFSClient implements FSCons
// to each DN and two rpcs to the NN.
int recoveryTimeout = (newnodes.length * 2 + 2) * socketTimeout;
primary = createClientDatanodeProtocolProxy(primaryNode, conf, block,
- accessToken, recoveryTimeout);
+ accessToken, recoveryTimeout, connectToDnViaHostname);
newBlock = primary.recoverBlock(block, isAppend, newnodes);
} catch (IOException e) {
LOG.warn("Failed recovery attempt #" + recoveryErrorCount +
@@ -3252,8 +3443,17 @@ public class DFSClient implements FSCons
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try {
- namenode.create(
- src, masked, clientName, overwrite, createParent, replication, blockSize);
+ // Make sure the regular create() is done through the old create().
+ // This is done to ensure that newer clients (post-1.0) can talk to
+ // older clusters (pre-1.0). Older clusters lack the new create()
+ // method accepting createParent as one of the arguments.
+ if (createParent) {
+ namenode.create(
+ src, masked, clientName, overwrite, replication, blockSize);
+ } else {
+ namenode.create(
+ src, masked, clientName, overwrite, false, replication, blockSize);
+ }
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileAlreadyExistsException.class,
@@ -3388,7 +3588,7 @@ public class DFSClient implements FSCons
success = createBlockOutputStream(nodes, clientName, false);
if (!success) {
- LOG.info("Abandoning block " + block);
+ LOG.info("Abandoning " + block);
namenode.abandonBlock(block, src, clientName);
if (errorIndex < nodes.length) {
@@ -3425,16 +3625,19 @@ public class DFSClient implements FSCons
boolean result = false;
try {
- LOG.debug("Connecting to " + nodes[0].getName());
- InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
+ final String dnName = nodes[0].getName(connectToDnViaHostname);
+ InetSocketAddress target = NetUtils.createSocketAddr(dnName);
s = socketFactory.createSocket();
- timeoutValue = 3000 * nodes.length + socketTimeout;
- NetUtils.connect(s, target, timeoutValue);
+ timeoutValue = (socketTimeout > 0) ?
+ (3000 * nodes.length + socketTimeout) : 0;
+ LOG.debug("Connecting to " + dnName);
+ NetUtils.connect(s, target, getRandomLocalInterfaceAddr(), timeoutValue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
LOG.debug("Send buf size " + s.getSendBufferSize());
- long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
- datanodeWriteTimeout;
+ long writeTimeout = (datanodeWriteTimeout > 0) ?
+ (HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
+ datanodeWriteTimeout) : 0;
//
// Xmit header info to datanode
@@ -3817,12 +4020,12 @@ public class DFSClient implements FSCons
throw e;
}
closeInternal();
- leasechecker.remove(src);
if (s != null) {
s.close();
s = null;
}
+ endFileLease(src);
}
/**
@@ -3835,6 +4038,20 @@ public class DFSClient implements FSCons
closed = true;
}
+ /**
+ * Aborts this output stream and releases any system
+ * resources associated with this stream.
+ */
+ synchronized void abort() throws IOException {
+ if (closed) {
+ return;
+ }
+ setLastException(new IOException("Lease timeout of "
+ + (hdfsTimeout / 1000) + " seconds expired."));
+ closeThreads();
+ endFileLease(src);
+ }
+
// shutdown datastreamer and responseprocessor threads.
private void closeThreads() throws IOException {
try {
@@ -3903,10 +4120,20 @@ public class DFSClient implements FSCons
while (!fileComplete) {
fileComplete = namenode.complete(src, clientName);
if (!fileComplete) {
+ if (!clientRunning ||
+ (hdfsTimeout > 0 &&
+ localstart + hdfsTimeout < System.currentTimeMillis())) {
+ String msg = "Unable to close file because dfsclient " +
+ " was unable to contact the HDFS servers." +
+ " clientRunning " + clientRunning +
+ " hdfsTimeout " + hdfsTimeout;
+ LOG.info(msg);
+ throw new IOException(msg);
+ }
try {
Thread.sleep(400);
if (System.currentTimeMillis() - localstart > 5000) {
- LOG.info("Could not complete file " + src + " retrying...");
+ LOG.info("Could not complete " + src + " retrying...");
}
} catch (InterruptedException ie) {
}
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jun 21 06:37:27 2013
@@ -36,9 +36,15 @@ public class DFSConfigKeys extends Commo
public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+ public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.client.retry.policy.enabled";
+ public static final boolean DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false;
+ public static final String DFS_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.client.retry.policy.spec";
+ public static final String DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
-
+ public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
+ public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
+
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
@@ -46,6 +52,15 @@ public class DFSConfigKeys extends Commo
public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
+ public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
+ public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
+ public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
+ public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
+ public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
+ public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
+ public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
+ public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
+
public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
@@ -58,6 +73,8 @@ public class DFSConfigKeys extends Commo
public static final int DFS_NAMENODE_SAFEMODE_EXTENSION_DEFAULT = 30000;
public static final String DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY = "dfs.namenode.safemode.threshold-pct";
public static final float DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;
+ public static final String DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY = "dfs.namenode.safemode.min.datanodes";
+ public static final int DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY = "dfs.namenode.secondary.http-address";
public static final String DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
public static final String DFS_NAMENODE_CHECKPOINT_PERIOD_KEY = "dfs.namenode.checkpoint.period";
@@ -90,6 +107,8 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";
public static final boolean DFS_PERMISSIONS_ENABLED_DEFAULT = true;
+ public static final String DFS_PERSIST_BLOCKS_KEY = "dfs.persist.blocks";
+ public static final boolean DFS_PERSIST_BLOCKS_DEFAULT = false;
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_KEY = "dfs.permissions.superusergroup";
public static final String DFS_ADMIN = "dfs.cluster.administrators";
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = "supergroup";
@@ -101,6 +120,10 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
+ public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
+ public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
+ public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
+ public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
//Delegation token related keys
public static final String DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";
@@ -109,6 +132,8 @@ public class DFSConfigKeys extends Commo
public static final long DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 24*60*60*1000;
public static final String DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY = "dfs.namenode.delegation.token.max-lifetime";
public static final long DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 7*24*60*60*1000;
+ public static final String DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY = "dfs.namenode.delegation.token.always-use"; // for tests
+ public static final boolean DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT = false;
//Following keys have no defaults
public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";
@@ -119,7 +144,6 @@ public class DFSConfigKeys extends Commo
public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
public static final String DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
public static final String DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size";
- public static final String DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
public static final String DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
public static final String DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
public static final String DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";
@@ -128,6 +152,25 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = "dfs.namenode.checkpoint.dir";
public static final String DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
+ public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
+
+ // Whether to enable datanode's stale state detection and usage for reads
+ public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
+ public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
+ // Whether to enable datanode's stale state detection and usage for writes
+ public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+ public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
+ // The default value of the time interval for marking datanodes as stale
+ public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
+ public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+ // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states.
+ // This value uses the times of heartbeat interval to define the minimum value for stale interval.
+ public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+ public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+ // When the percentage of stale datanodes reaches this ratio,
+ // allow writing to stale nodes to prevent hotspots.
+ public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+ public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
//Code in hdfs is not updated to use these keys.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
@@ -174,8 +217,6 @@ public class DFSConfigKeys extends Commo
public static final int DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
public static final int DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
- public static final String DFS_SUPPORT_APPEND_KEY = "dfs.support.append";
- public static final boolean DFS_SUPPORT_APPEND_DEFAULT = false;
public static final String DFS_HTTPS_ENABLE_KEY = "dfs.https.enable";
public static final boolean DFS_HTTPS_ENABLE_DEFAULT = false;
public static final String DFS_DEFAULT_CHUNK_VIEW_SIZE_KEY = "dfs.default.chunk.view.size";
@@ -185,6 +226,14 @@ public class DFSConfigKeys extends Commo
public static final String DFS_DATANODE_IPC_ADDRESS_KEY = "dfs.datanode.ipc.address";
public static final String DFS_DATANODE_IPC_ADDRESS_DEFAULT = "0.0.0.0:50020";
+ //Replication monitoring related keys
+ public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
+ "dfs.namenode.invalidate.work.pct.per.iteration";
+ public static final float DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT = 0.32f;
+ public static final String DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION =
+ "dfs.namenode.replication.work.multiplier.per.iteration";
+ public static final int DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT = 2;
+
public static final String DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY = "dfs.block.access.token.enable";
public static final boolean DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT = false;
public static final String DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY = "dfs.block.access.key.update.interval";
@@ -202,11 +251,18 @@ public class DFSConfigKeys extends Commo
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
public static final int DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT = 100;
+ public static final String DFS_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
+ public static final int DFS_MAX_CORRUPT_FILES_RETURNED_DEFAULT = 500;
+
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
+ public static final String DFS_IMAGE_TRANSFER_RATE_KEY =
+ "dfs.image.transfer.bandwidthPerSec";
+ public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0; //no throttling
+
//Keys with no defaults
public static final String DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
public static final String DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
@@ -219,14 +275,16 @@ public class DFSConfigKeys extends Commo
public static final String DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file";
public static final String DFS_NAMENODE_USER_NAME_KEY = "dfs.namenode.kerberos.principal";
public static final String DFS_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.namenode.kerberos.https.principal";
+ public static final String DFS_NAMENODE_INTERNAL_SPENGO_USER_NAME_KEY = "dfs.namenode.kerberos.internal.spnego.principal";
public static final String DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY = "dfs.secondary.namenode.keytab.file";
public static final String DFS_SECONDARY_NAMENODE_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.principal";
public static final String DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
+ public static final String DFS_SECONDARY_NAMENODE_INTERNAL_SPENGO_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.internal.spnego.principal";
public static final String DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
public static final int DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
- public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
+ public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
}
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/DFSUtil.java Fri Jun 21 06:37:27 2013
@@ -22,8 +22,10 @@ import java.io.UnsupportedEncodingExcept
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Comparator;
import java.util.StringTokenizer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -123,5 +125,75 @@ public class DFSUtil {
throw new IllegalArgumentException(ue);
}
}
+
+ /**
+ * Get DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION from configuration.
+ *
+ * @param conf Configuration
+ * @return Value of DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION
+ * @throws IllegalArgumentException If the value is not positive
+ */
+ public static float getInvalidateWorkPctPerIteration(Configuration conf) {
+ float blocksInvalidateWorkPct = conf.getFloat(
+ DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
+ DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION_DEFAULT);
+ if (blocksInvalidateWorkPct <= 0.0f || blocksInvalidateWorkPct > 1.0f) {
+ throw new IllegalArgumentException(
+ DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION + " = '"
+ + blocksInvalidateWorkPct + "' is invalid. "
+ + "It should be a positive, non-zero float value "
+ + "indicating a percentage.");
+ }
+ return blocksInvalidateWorkPct;
+ }
+
+ /**
+ * Get DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+ * from configuration.
+ *
+ * @param conf Configuration
+ * @return Value of DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+ * @throws IllegalArgumentException If the value is not positive
+ */
+ public static int getReplWorkMultiplier(Configuration conf) {
+ int blocksReplWorkMultiplier = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+ DFSConfigKeys.
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
+ if (blocksReplWorkMultiplier <= 0) {
+ throw new IllegalArgumentException(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+ + " = '" + blocksReplWorkMultiplier + "' is invalid. "
+ + "It should be a positive, non-zero integer value.");
+ }
+ return blocksReplWorkMultiplier;
+ }
+
+ /**
+ * Comparator for sorting DataNodeInfo[] based on stale states. Stale nodes
+ * are moved to the end of the array on sorting with this comparator.
+ */
+ public static class StaleComparator implements Comparator<DatanodeInfo> {
+ private long staleInterval;
+
+ /**
+ * Constructor of StaleComparator
+ *
+ * @param interval
+ * The time interval for marking datanodes as stale is passed from
+ * outside, since the interval may be changed dynamically
+ */
+ public StaleComparator(long interval) {
+ this.staleInterval = interval;
+ }
+
+ @Override
+ public int compare(DatanodeInfo a, DatanodeInfo b) {
+ // Stale nodes will be moved behind the normal nodes
+ boolean aStale = a.isStale(staleInterval);
+ boolean bStale = b.isStale(staleInterval);
+ return aStale == bStale ? 0 : (aStale ? 1 : -1);
+ }
+ }
}