You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:07 UTC
svn commit: r1399950 [6/27] - in
/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./
hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apach...
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java Fri Oct 19 02:25:55 2012
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.Random;
import org.apache.bookkeeper.util.LocalBookKeeper;
import org.apache.commons.logging.Log;
@@ -42,6 +43,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
public class TestBookKeeperConfiguration {
private static final Log LOG = LogFactory
.getLog(TestBookKeeperConfiguration.class);
@@ -73,6 +76,11 @@ public class TestBookKeeperConfiguration
return zkc;
}
+ private NamespaceInfo newNSInfo() {
+ Random r = new Random();
+ return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
+ }
+
@BeforeClass
public static void setupZooKeeper() throws Exception {
// create a ZooKeeper server(dataDir, dataLogDir, port)
@@ -137,8 +145,10 @@ public class TestBookKeeperConfiguration
bkAvailablePath);
Assert.assertNull(bkAvailablePath + " already exists", zkc.exists(
bkAvailablePath, false));
- bkjm = new BookKeeperJournalManager(conf, URI.create("bookkeeper://"
- + HOSTPORT + "/hdfsjournal-WithBKPath"));
+ NamespaceInfo nsi = newNSInfo();
+ bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"),
+ nsi);
Assert.assertNotNull("Bookie available path : " + bkAvailablePath
+ " doesn't exists", zkc.exists(bkAvailablePath, false));
}
@@ -152,8 +162,10 @@ public class TestBookKeeperConfiguration
Configuration conf = new Configuration();
Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists(
BK_ROOT_PATH, false));
- new BookKeeperJournalManager(conf, URI.create("bookkeeper://" + HOSTPORT
- + "/hdfsjournal-DefaultBKPath"));
+ NamespaceInfo nsi = newNSInfo();
+ bkjm = new BookKeeperJournalManager(conf,
+ URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
+ nsi);
Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
+ " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Fri Oct 19 02:25:55 2012
@@ -29,6 +29,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.net.URI;
import java.util.List;
+import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.zookeeper.CreateMode;
@@ -78,10 +80,17 @@ public class TestBookKeeperJournalManage
zkc.close();
}
+ private NamespaceInfo newNSInfo() {
+ Random r = new Random();
+ return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
+ }
+
@Test
public void testSimpleWrite() throws Exception {
+ NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
+ BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
+
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -99,8 +108,10 @@ public class TestBookKeeperJournalManage
@Test
public void testNumberOfTransactions() throws Exception {
+ NamespaceInfo nsi = newNSInfo();
+
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
+ BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -116,8 +127,10 @@ public class TestBookKeeperJournalManage
@Test
public void testNumberOfTransactionsWithGaps() throws Exception {
+ NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-gaps"));
+ BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
+
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
@@ -151,8 +164,10 @@ public class TestBookKeeperJournalManage
@Test
public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
+ NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"));
+ BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
+
long txid = 1;
for (long i = 0; i < 3; i++) {
long start = txid;
@@ -190,8 +205,10 @@ public class TestBookKeeperJournalManage
*/
@Test
public void testWriteRestartFrom1() throws Exception {
+ NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"));
+ BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
+
long txid = 1;
long start = txid;
EditLogOutputStream out = bkjm.startLogSegment(txid);
@@ -245,11 +262,15 @@ public class TestBookKeeperJournalManage
@Test
public void testTwoWriters() throws Exception {
long start = 1;
+ NamespaceInfo nsi = newNSInfo();
+
BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
+ BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
+
BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
-
+ BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
+
+
EditLogOutputStream out1 = bkjm1.startLogSegment(start);
try {
bkjm2.startLogSegment(start);
@@ -263,8 +284,11 @@ public class TestBookKeeperJournalManage
@Test
public void testSimpleRead() throws Exception {
+ NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
+ BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
+ nsi);
+
final long numTransactions = 10000;
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= numTransactions; i++) {
@@ -287,8 +311,11 @@ public class TestBookKeeperJournalManage
@Test
public void testSimpleRecovery() throws Exception {
+ NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
+ BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
+ nsi);
+
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1 ; i <= 100; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -334,8 +361,10 @@ public class TestBookKeeperJournalManage
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
long txid = 1;
+ NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"));
+ BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
+ nsi);
EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) {
@@ -416,8 +445,12 @@ public class TestBookKeeperJournalManage
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
ensembleSize);
long txid = 1;
+
+ NamespaceInfo nsi = newNSInfo();
BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
- BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"));
+ BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
+ nsi);
+
EditLogOutputStream out = bkjm.startLogSegment(txid);
for (long i = 1 ; i <= 3; i++) {
FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -464,7 +497,9 @@ public class TestBookKeeperJournalManage
@Test
public void testEmptyInprogressNode() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+ NamespaceInfo nsi = newNSInfo();
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+ nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) {
@@ -481,7 +516,7 @@ public class TestBookKeeperJournalManage
String inprogressZNode = bkjm.inprogressZNode(101);
zkc.setData(inprogressZNode, new byte[0], -1);
- bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm = new BookKeeperJournalManager(conf, uri, nsi);
try {
bkjm.recoverUnfinalizedSegments();
fail("Should have failed. There should be no way of creating"
@@ -489,7 +524,7 @@ public class TestBookKeeperJournalManage
} catch (IOException e) {
// correct behaviour
assertTrue("Exception different than expected", e.getMessage().contains(
- "Invalid ledger entry,"));
+ "Invalid/Incomplete data in znode"));
} finally {
bkjm.close();
}
@@ -503,7 +538,9 @@ public class TestBookKeeperJournalManage
@Test
public void testCorruptInprogressNode() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+ NamespaceInfo nsi = newNSInfo();
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+ nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) {
@@ -521,7 +558,7 @@ public class TestBookKeeperJournalManage
String inprogressZNode = bkjm.inprogressZNode(101);
zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
- bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm = new BookKeeperJournalManager(conf, uri, nsi);
try {
bkjm.recoverUnfinalizedSegments();
fail("Should have failed. There should be no way of creating"
@@ -529,8 +566,7 @@ public class TestBookKeeperJournalManage
} catch (IOException e) {
// correct behaviour
assertTrue("Exception different than expected", e.getMessage().contains(
- "Invalid ledger entry,"));
-
+ "has no field named"));
} finally {
bkjm.close();
}
@@ -544,7 +580,9 @@ public class TestBookKeeperJournalManage
@Test
public void testEmptyInprogressLedger() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+ NamespaceInfo nsi = newNSInfo();
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+ nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) {
@@ -559,7 +597,7 @@ public class TestBookKeeperJournalManage
out.close();
bkjm.close();
- bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments();
out = bkjm.startLogSegment(101);
for (long i = 1; i <= 100; i++) {
@@ -581,7 +619,9 @@ public class TestBookKeeperJournalManage
public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
URI uri = BKJMUtil
.createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+ NamespaceInfo nsi = newNSInfo();
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+ nsi);
EditLogOutputStream out = bkjm.startLogSegment(1);
for (long i = 1; i <= 100; i++) {
@@ -601,7 +641,7 @@ public class TestBookKeeperJournalManage
byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
// finalize
- bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments();
bkjm.close();
@@ -613,7 +653,7 @@ public class TestBookKeeperJournalManage
CreateMode.PERSISTENT);
// should work fine
- bkjm = new BookKeeperJournalManager(conf, uri);
+ bkjm = new BookKeeperJournalManager(conf, uri, nsi);
bkjm.recoverUnfinalizedSegments();
bkjm.close();
}
@@ -626,7 +666,10 @@ public class TestBookKeeperJournalManage
@Test
public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
- BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+ NamespaceInfo nsi = newNSInfo();
+ BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+ nsi);
+
try {
// start new inprogress log segment with txid=1
// and write transactions till txid=50
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
bin=`which $0`
bin=`dirname ${bin}`
-bin=`cd "$bin"; pwd`
+bin=`cd "$bin" > /dev/null; pwd`
DEFAULT_LIBEXEC_DIR="$bin"/../libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
@@ -30,6 +30,7 @@ function print_usage(){
echo " namenode -format format the DFS filesystem"
echo " secondarynamenode run the DFS secondary namenode"
echo " namenode run the DFS namenode"
+ echo " journalnode run the DFS journalnode"
echo " zkfc run the ZK Failover Controller daemon"
echo " datanode run a DFS datanode"
echo " dfsadmin run a DFS admin client"
@@ -90,6 +91,9 @@ elif [ "$COMMAND" = "datanode" ] ; then
else
HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"
fi
+elif [ "$COMMAND" = "journalnode" ] ; then
+ CLASS='org.apache.hadoop.hdfs.qjournal.server.JournalNode'
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOURNALNODE_OPTS"
elif [ "$COMMAND" = "dfs" ] ; then
CLASS=org.apache.hadoop.fs.FsShell
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh Fri Oct 19 02:25:55 2012
@@ -61,4 +61,14 @@ if [ -n "$SECONDARY_NAMENODES" ]; then
--script "$bin/hdfs" stop secondarynamenode
fi
+#---------------------------------------------------------
+# ZK Failover controllers, if auto-HA is enabled
+AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled)
+if [ "$(echo "$AUTOHA_ENABLED" | tr A-Z a-z)" = "true" ]; then
+ echo "Stopping ZK Failover Controllers on NN hosts [$NAMENODES]"
+ "$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
+ --config "$HADOOP_CONF_DIR" \
+ --hostnames "$NAMENODES" \
+ --script "$bin/hdfs" stop zkfc
+fi
# eof
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties Fri Oct 19 02:25:55 2012
@@ -19,7 +19,7 @@
# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
-# default sampling period
+# default sampling period, in seconds
*.period=10
# The namenode-metrics.out will contain metrics from all context
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/faultinject_framework.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/faultinject_framework.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/faultinject_framework.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/faultinject_framework.xml Fri Oct 19 02:25:55 2012
@@ -332,13 +332,12 @@ package org.apache.hadoop.fs;
import org.junit.Test;
import org.junit.Before;
-import junit.framework.TestCase;
-public class DemoFiTest extends TestCase {
+public class DemoFiTest {
public static final String BLOCK_RECEIVER_FAULT="hdfs.datanode.BlockReceiver";
@Override
@Before
- public void setUp(){
+ public void setUp() {
//Setting up the test's environment as required
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/libhdfs.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/libhdfs.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/libhdfs.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/libhdfs.xml Fri Oct 19 02:25:55 2012
@@ -1,110 +1,110 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
- "http://forrest.apache.org/dtd/document-v20.dtd">
-
-<document>
-<header>
-<title>C API libhdfs</title>
-<meta name="http-equiv">Content-Type</meta>
-<meta name="content">text/html;</meta>
-<meta name="charset">utf-8</meta>
-</header>
-<body>
-<section>
-<title>Overview</title>
-
-<p>
-libhdfs is a JNI based C API for Hadoop's Distributed File System (HDFS).
-It provides C APIs to a subset of the HDFS APIs to manipulate HDFS files and
-the filesystem. libhdfs is part of the Hadoop distribution and comes
-pre-compiled in ${HADOOP_PREFIX}/libhdfs/libhdfs.so .
-</p>
-
-</section>
-<section>
-<title>The APIs</title>
-
-<p>
-The libhdfs APIs are a subset of: <a href="api/org/apache/hadoop/fs/FileSystem.html" >hadoop fs APIs</a>.
-</p>
-<p>
-The header file for libhdfs describes each API in detail and is available in ${HADOOP_PREFIX}/src/c++/libhdfs/hdfs.h
-</p>
-</section>
-<section>
-<title>A Sample Program</title>
-
-<source>
-#include "hdfs.h"
-
-int main(int argc, char **argv) {
-
- hdfsFS fs = hdfsConnect("default", 0);
- const char* writePath = "/tmp/testfile.txt";
- hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
- if(!writeFile) {
- fprintf(stderr, "Failed to open %s for writing!\n", writePath);
- exit(-1);
- }
- char* buffer = "Hello, World!";
- tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
- if (hdfsFlush(fs, writeFile)) {
- fprintf(stderr, "Failed to 'flush' %s\n", writePath);
- exit(-1);
- }
- hdfsCloseFile(fs, writeFile);
-}
-</source>
-</section>
-
-<section>
-<title>How To Link With The Library</title>
-<p>
-See the Makefile for hdfs_test.c in the libhdfs source directory (${HADOOP_PREFIX}/src/c++/libhdfs/Makefile) or something like:<br />
-gcc above_sample.c -I${HADOOP_PREFIX}/src/c++/libhdfs -L${HADOOP_PREFIX}/libhdfs -lhdfs -o above_sample
-</p>
-</section>
-<section>
-<title>Common Problems</title>
-<p>
-The most common problem is the CLASSPATH is not set properly when calling a program that uses libhdfs.
-Make sure you set it to all the Hadoop jars needed to run Hadoop itself. Currently, there is no way to
-programmatically generate the classpath, but a good bet is to include all the jar files in ${HADOOP_PREFIX}
-and ${HADOOP_PREFIX}/lib as well as the right configuration directory containing hdfs-site.xml
-</p>
-</section>
-<section>
-<title>Thread Safe</title>
-<p>libdhfs is thread safe.</p>
-<ul>
-<li>Concurrency and Hadoop FS "handles"
-<br />The Hadoop FS implementation includes a FS handle cache which caches based on the URI of the
-namenode along with the user connecting. So, all calls to hdfsConnect will return the same handle but
-calls to hdfsConnectAsUser with different users will return different handles. But, since HDFS client
-handles are completely thread safe, this has no bearing on concurrency.
-</li>
-<li>Concurrency and libhdfs/JNI
-<br />The libhdfs calls to JNI should always be creating thread local storage, so (in theory), libhdfs
-should be as thread safe as the underlying calls to the Hadoop FS.
-</li>
-</ul>
-</section>
-</body>
-</document>
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
+ "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document>
+<header>
+<title>C API libhdfs</title>
+<meta name="http-equiv">Content-Type</meta>
+<meta name="content">text/html;</meta>
+<meta name="charset">utf-8</meta>
+</header>
+<body>
+<section>
+<title>Overview</title>
+
+<p>
+libhdfs is a JNI based C API for Hadoop's Distributed File System (HDFS).
+It provides C APIs to a subset of the HDFS APIs to manipulate HDFS files and
+the filesystem. libhdfs is part of the Hadoop distribution and comes
+pre-compiled in ${HADOOP_PREFIX}/libhdfs/libhdfs.so .
+</p>
+
+</section>
+<section>
+<title>The APIs</title>
+
+<p>
+The libhdfs APIs are a subset of: <a href="api/org/apache/hadoop/fs/FileSystem.html" >hadoop fs APIs</a>.
+</p>
+<p>
+The header file for libhdfs describes each API in detail and is available in ${HADOOP_PREFIX}/src/c++/libhdfs/hdfs.h
+</p>
+</section>
+<section>
+<title>A Sample Program</title>
+
+<source>
+#include "hdfs.h"
+
+int main(int argc, char **argv) {
+
+ hdfsFS fs = hdfsConnect("default", 0);
+ const char* writePath = "/tmp/testfile.txt";
+ hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
+ if(!writeFile) {
+ fprintf(stderr, "Failed to open %s for writing!\n", writePath);
+ exit(-1);
+ }
+ char* buffer = "Hello, World!";
+ tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
+ if (hdfsFlush(fs, writeFile)) {
+ fprintf(stderr, "Failed to 'flush' %s\n", writePath);
+ exit(-1);
+ }
+ hdfsCloseFile(fs, writeFile);
+}
+</source>
+</section>
+
+<section>
+<title>How To Link With The Library</title>
+<p>
+See the Makefile for hdfs_test.c in the libhdfs source directory (${HADOOP_PREFIX}/src/c++/libhdfs/Makefile) or something like:<br />
+gcc above_sample.c -I${HADOOP_PREFIX}/src/c++/libhdfs -L${HADOOP_PREFIX}/libhdfs -lhdfs -o above_sample
+</p>
+</section>
+<section>
+<title>Common Problems</title>
+<p>
+The most common problem is the CLASSPATH is not set properly when calling a program that uses libhdfs.
+Make sure you set it to all the Hadoop jars needed to run Hadoop itself. Currently, there is no way to
+programmatically generate the classpath, but a good bet is to include all the jar files in ${HADOOP_PREFIX}
+and ${HADOOP_PREFIX}/lib as well as the right configuration directory containing hdfs-site.xml
+</p>
+</section>
+<section>
+<title>Thread Safe</title>
+<p>libdhfs is thread safe.</p>
+<ul>
+<li>Concurrency and Hadoop FS "handles"
+<br />The Hadoop FS implementation includes a FS handle cache which caches based on the URI of the
+namenode along with the user connecting. So, all calls to hdfsConnect will return the same handle but
+calls to hdfsConnectAsUser with different users will return different handles. But, since HDFS client
+handles are completely thread safe, this has no bearing on concurrency.
+</li>
+<li>Concurrency and libhdfs/JNI
+<br />The libhdfs calls to JNI should always be creating thread local storage, so (in theory), libhdfs
+should be as thread safe as the underlying calls to the Hadoop FS.
+</li>
+</ul>
+</section>
+</body>
+</document>
Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1363593-1396941
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1360400-1399945
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Fri Oct 19 02:25:55 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -93,10 +94,10 @@ public class Hdfs extends AbstractFileSy
public HdfsDataOutputStream createInternal(Path f,
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
int bufferSize, short replication, long blockSize, Progressable progress,
- int bytesPerChecksum, boolean createParent) throws IOException {
+ ChecksumOpt checksumOpt, boolean createParent) throws IOException {
return new HdfsDataOutputStream(dfs.primitiveCreate(getUriPath(f),
absolutePermission, createFlag, createParent, replication, blockSize,
- progress, bufferSize, bytesPerChecksum), getStatistics());
+ progress, bufferSize, checksumOpt), getStatistics());
}
@Override
@@ -311,9 +312,6 @@ public class Hdfs extends AbstractFileSy
return listing.toArray(new FileStatus[listing.size()]);
}
- /**
- * {@inheritDoc}
- */
@Override
public RemoteIterator<Path> listCorruptFileBlocks(Path path)
throws IOException {
@@ -323,7 +321,7 @@ public class Hdfs extends AbstractFileSy
@Override
public void mkdir(Path dir, FsPermission permission, boolean createParent)
throws IOException, UnresolvedLinkException {
- dfs.mkdirs(getUriPath(dir), permission, createParent);
+ dfs.primitiveMkdir(getUriPath(dir), permission, createParent);
}
@SuppressWarnings("deprecation")
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Fri Oct 19 02:25:55 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.Socket;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
/**
* A BlockReader is responsible for reading a single block
@@ -71,4 +72,8 @@ public interface BlockReader extends Byt
*/
boolean hasSentStatusCode();
+ /**
+ * @return a reference to the streams this block reader is using.
+ */
+ IOStreamPair getStreams();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Fri Oct 19 02:25:55 2012
@@ -25,7 +25,12 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@@ -41,12 +46,13 @@ public class BlockReaderFactory {
Configuration conf,
Socket sock, String file,
ExtendedBlock block, Token<BlockTokenIdentifier> blockToken,
- long startOffset, long len) throws IOException {
+ long startOffset, long len, DataEncryptionKey encryptionKey)
+ throws IOException {
int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
return newBlockReader(new Conf(conf),
sock, file, block, blockToken, startOffset,
- len, bufferSize, true, "");
+ len, bufferSize, true, "", encryptionKey, null);
}
/**
@@ -73,14 +79,32 @@ public class BlockReaderFactory {
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
- String clientName)
+ String clientName,
+ DataEncryptionKey encryptionKey,
+ IOStreamPair ioStreams)
throws IOException {
+
if (conf.useLegacyBlockReader) {
+ if (encryptionKey != null) {
+ throw new RuntimeException("Encryption is not supported with the legacy block reader.");
+ }
return RemoteBlockReader.newBlockReader(
sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
} else {
+ if (ioStreams == null) {
+ ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
+ NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
+ if (encryptionKey != null) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(
+ ioStreams.out, ioStreams.in, encryptionKey);
+ ioStreams = encryptedStreams;
+ }
+ }
+
return RemoteBlockReader2.newBlockReader(
- sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+ sock, file, block, blockToken, startOffset, len, bufferSize,
+ verifyChecksum, clientName, encryptionKey, ioStreams);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Fri Oct 19 02:25:55 2012
@@ -35,10 +35,12 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -84,11 +86,11 @@ class BlockReaderLocal implements BlockR
}
private synchronized ClientDatanodeProtocol getDatanodeProxy(
- DatanodeInfo node, Configuration conf, int socketTimeout)
- throws IOException {
+ DatanodeInfo node, Configuration conf, int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
if (proxy == null) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
- socketTimeout);
+ socketTimeout, connectToDnViaHostname);
}
return proxy;
}
@@ -154,14 +156,16 @@ class BlockReaderLocal implements BlockR
*/
static BlockReaderLocal newBlockReader(Configuration conf, String file,
ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
- int socketTimeout, long startOffset, long length) throws IOException {
+ int socketTimeout, long startOffset, long length,
+ boolean connectToDnViaHostname) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
- pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+ pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
+ connectToDnViaHostname);
}
// check to see if the file exists. It may so happen that the
@@ -239,11 +243,12 @@ class BlockReaderLocal implements BlockR
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
DatanodeInfo node, Configuration conf, int timeout,
- Token<BlockTokenIdentifier> token) throws IOException {
+ Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
+ throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
- conf, timeout);
+ conf, timeout, connectToDnViaHostname);
try {
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
@@ -285,7 +290,7 @@ class BlockReaderLocal implements BlockR
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
throws IOException {
this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
- DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 4), false,
+ DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
dataIn, startOffset, null);
}
@@ -315,23 +320,10 @@ class BlockReaderLocal implements BlockR
boolean success = false;
try {
// Skip both input streams to beginning of the chunk containing startOffset
- long toSkip = firstChunkOffset;
- while (toSkip > 0) {
- long skipped = dataIn.skip(toSkip);
- if (skipped == 0) {
- throw new IOException("Couldn't initialize input stream");
- }
- toSkip -= skipped;
- }
+ IOUtils.skipFully(dataIn, firstChunkOffset);
if (checksumIn != null) {
long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
- while (checkSumOffset > 0) {
- long skipped = checksumIn.skip(checkSumOffset);
- if (skipped == 0) {
- throw new IOException("Couldn't initialize checksum input stream");
- }
- checkSumOffset -= skipped;
- }
+ IOUtils.skipFully(checksumIn, checkSumOffset);
}
success = true;
} finally {
@@ -636,17 +628,9 @@ class BlockReaderLocal implements BlockR
slowReadBuff.position(slowReadBuff.limit());
checksumBuff.position(checksumBuff.limit());
- long dataSkipped = dataIn.skip(toskip);
- if (dataSkipped != toskip) {
- throw new IOException("skip error in data input stream");
- }
- long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize;
- if (checkSumOffset > 0) {
- long skipped = checksumIn.skip(checkSumOffset);
- if (skipped != checkSumOffset) {
- throw new IOException("skip error in checksum input stream");
- }
- }
+ IOUtils.skipFully(dataIn, toskip);
+ long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
+ IOUtils.skipFully(checksumIn, checkSumOffset);
// read into the middle of the chunk
if (skipBuf == null) {
@@ -701,4 +685,9 @@ class BlockReaderLocal implements BlockR
public boolean hasSentStatusCode() {
return false;
}
-}
\ No newline at end of file
+
+ @Override
+ public IOStreamPair getStreams() {
+ return null;
+ }
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Fri Oct 19 02:25:55 2012
@@ -22,12 +22,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HttpHeaders;
/**
* To support HTTP byte streams, a new connection to an HTTP server needs to be
@@ -57,9 +60,9 @@ public abstract class ByteRangeInputStre
return url;
}
- protected abstract HttpURLConnection openConnection() throws IOException;
-
- protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
+ /** Connect to server with a data offset. */
+ protected abstract HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException;
}
enum StreamStatus {
@@ -70,7 +73,7 @@ public abstract class ByteRangeInputStre
protected URLOpener resolvedURL;
protected long startPos = 0;
protected long currentPos = 0;
- protected long filelength;
+ protected Long fileLength = null;
StreamStatus status = StreamStatus.SEEK;
@@ -85,9 +88,6 @@ public abstract class ByteRangeInputStre
this.resolvedURL = r;
}
- protected abstract void checkResponseCode(final HttpURLConnection connection
- ) throws IOException;
-
protected abstract URL getResolvedUrl(final HttpURLConnection connection
) throws IOException;
@@ -113,35 +113,64 @@ public abstract class ByteRangeInputStre
protected InputStream openInputStream() throws IOException {
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
- final URLOpener opener =
- (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
-
- final HttpURLConnection connection = opener.openConnection(startPos);
- connection.connect();
- checkResponseCode(connection);
-
- final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
- if (cl == null) {
- throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
- }
- final long streamlength = Long.parseLong(cl);
- filelength = startPos + streamlength;
- // Java has a bug with >2GB request streams. It won't bounds check
- // the reads so the transfer blocks until the server times out
- InputStream is =
- new BoundedInputStream(connection.getInputStream(), streamlength);
+ final boolean resolved = resolvedURL.getURL() != null;
+ final URLOpener opener = resolved? resolvedURL: originalURL;
+ final HttpURLConnection connection = opener.connect(startPos, resolved);
resolvedURL.setURL(getResolvedUrl(connection));
-
- return is;
+
+ InputStream in = connection.getInputStream();
+ final Map<String, List<String>> headers = connection.getHeaderFields();
+ if (isChunkedTransferEncoding(headers)) {
+ // file length is not known
+ fileLength = null;
+ } else {
+ // for non-chunked transfer-encoding, get content-length
+ final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
+ if (cl == null) {
+ throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
+ + headers);
+ }
+ final long streamlength = Long.parseLong(cl);
+ fileLength = startPos + streamlength;
+
+ // Java has a bug with >2GB request streams. It won't bounds check
+ // the reads so the transfer blocks until the server times out
+ in = new BoundedInputStream(in, streamlength);
+ }
+
+ return in;
}
+ private static boolean isChunkedTransferEncoding(
+ final Map<String, List<String>> headers) {
+ return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
+ || contains(headers, HttpHeaders.TE, "chunked");
+ }
+
+ /** Does the HTTP header map contain the given key, value pair? */
+ private static boolean contains(final Map<String, List<String>> headers,
+ final String key, final String value) {
+ final List<String> values = headers.get(key);
+ if (values != null) {
+ for(String v : values) {
+ for(final StringTokenizer t = new StringTokenizer(v, ",");
+ t.hasMoreTokens(); ) {
+ if (value.equalsIgnoreCase(t.nextToken())) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
private int update(final int n) throws IOException {
if (n != -1) {
currentPos += n;
- } else if (currentPos < filelength) {
+ } else if (fileLength != null && currentPos < fileLength) {
throw new IOException("Got EOF but currentPos = " + currentPos
- + " < filelength = " + filelength);
+ + " < filelength = " + fileLength);
}
return n;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Oct 19 02:25:55 2012
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -47,12 +49,15 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -64,6 +69,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -73,8 +79,10 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -82,12 +90,17 @@ import org.apache.hadoop.fs.FileAlreadyE
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -97,16 +110,18 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -114,9 +129,10 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -136,6 +152,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -173,11 +190,12 @@ public class DFSClient implements java.i
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
- final LeaseRenewer leaserenewer;
+ private final String authority;
final SocketCache socketCache;
final Conf dfsClientConf;
private Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
+ private DataEncryptionKey encryptionKey;
/**
* DFSClient configuration
@@ -189,11 +207,11 @@ public class DFSClient implements java.i
final int maxBlockAcquireFailures;
final int confTime;
final int ioBufferSize;
- final int checksumType;
- final int bytesPerChecksum;
+ final ChecksumOpt defaultChecksumOpt;
final int writePacketSize;
final int socketTimeout;
final int socketCacheCapacity;
+ final long socketCacheExpiry;
/** Wait time window (in msec) if BlockMissingException is caught */
final int timeWindow;
final int nCachedConnRetry;
@@ -205,6 +223,10 @@ public class DFSClient implements java.i
final String taskId;
final FsPermission uMask;
final boolean useLegacyBlockReader;
+ final boolean connectToDnViaHostname;
+ final boolean getHdfsBlocksMetadataEnabled;
+ final int getFileBlockStorageLocationsNumThreads;
+ final int getFileBlockStorageLocationsTimeout;
Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt(
@@ -225,9 +247,7 @@ public class DFSClient implements java.i
ioBufferSize = conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
- checksumType = getChecksumType(conf);
- bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
- DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ defaultChecksumOpt = getChecksumOptFromConf(conf);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
/** dfs.write.packet.size is an internal config variable */
@@ -240,6 +260,8 @@ public class DFSClient implements java.i
taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+ socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * defaultBlockSize);
timeWindow = conf
@@ -255,26 +277,59 @@ public class DFSClient implements java.i
useLegacyBlockReader = conf.getBoolean(
DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
- }
-
- private int getChecksumType(Configuration conf) {
- String checksum = conf.get(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
+ connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
+ DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ getHdfsBlocksMetadataEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+ getFileBlockStorageLocationsNumThreads = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
+ getFileBlockStorageLocationsTimeout = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
+ DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+ }
+
+ private DataChecksum.Type getChecksumType(Configuration conf) {
+ final String checksum = conf.get(
+ DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
- if ("CRC32".equals(checksum)) {
- return DataChecksum.CHECKSUM_CRC32;
- } else if ("CRC32C".equals(checksum)) {
- return DataChecksum.CHECKSUM_CRC32C;
- } else if ("NULL".equals(checksum)) {
- return DataChecksum.CHECKSUM_NULL;
- } else {
- LOG.warn("Bad checksum type: " + checksum + ". Using default.");
- return DataChecksum.CHECKSUM_CRC32C;
+ try {
+ return DataChecksum.Type.valueOf(checksum);
+ } catch(IllegalArgumentException iae) {
+ LOG.warn("Bad checksum type: " + checksum + ". Using default "
+ + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
+ return DataChecksum.Type.valueOf(
+ DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
}
}
- private DataChecksum createChecksum() {
- return DataChecksum.newDataChecksum(
- checksumType, bytesPerChecksum);
+ // Construct a checksum option from conf
+ private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
+ DataChecksum.Type type = getChecksumType(conf);
+ int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
+ DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ return new ChecksumOpt(type, bytesPerChecksum);
+ }
+
+ // create a DataChecksum with the default option.
+ private DataChecksum createChecksum() throws IOException {
+ return createChecksum(null);
+ }
+
+ private DataChecksum createChecksum(ChecksumOpt userOpt)
+ throws IOException {
+ // Fill in any missing field with the default.
+ ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
+ defaultChecksumOpt, userOpt);
+ DataChecksum dataChecksum = DataChecksum.newDataChecksum(
+ myOpt.getChecksumType(),
+ myOpt.getBytesPerChecksum());
+ if (dataChecksum == null) {
+ throw new IOException("Invalid checksum type specified: "
+ + myOpt.getChecksumType().name());
+ }
+ return dataChecksum;
}
}
@@ -343,12 +398,9 @@ public class DFSClient implements java.i
this.hdfsTimeout = Client.getTimeout(conf);
this.ugi = UserGroupInformation.getCurrentUser();
- final String authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
- this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
- this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
-
- this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
-
+ this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
+ this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
+ DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
if (rpcNamenode != null) {
// This case is used for testing.
@@ -379,6 +431,8 @@ public class DFSClient implements java.i
Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
Joiner.on(',').join(localInterfaceAddrs) + "]");
}
+
+ this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
}
/**
@@ -466,6 +520,14 @@ public class DFSClient implements java.i
return clientName;
}
+ /**
+ * @return whether the client should use hostnames instead of IPs
+ * when connecting to DataNodes
+ */
+ boolean connectToDnViaHostname() {
+ return dfsClientConf.connectToDnViaHostname;
+ }
+
void checkOpen() throws IOException {
if (!clientRunning) {
IOException result = new IOException("Filesystem closed");
@@ -473,7 +535,30 @@ public class DFSClient implements java.i
}
}
- /** Put a file. */
+ /** Return the lease renewer instance. The renewer thread won't start
+ * until the first output stream is created. The same instance will
+ * be returned until all output streams are closed.
+ */
+ public LeaseRenewer getLeaseRenewer() throws IOException {
+ return LeaseRenewer.getInstance(authority, ugi, this);
+ }
+
+ /** Get a lease and start automatic renewal */
+ private void beginFileLease(final String src, final DFSOutputStream out)
+ throws IOException {
+ getLeaseRenewer().put(src, out, this);
+ }
+
+ /** Stop renewal of lease for the file. */
+ void endFileLease(final String src) throws IOException {
+ getLeaseRenewer().closeFile(src, this);
+ }
+
+
+ /** Put a file. Only called from LeaseRenewer, where proper locking is
+ * enforced to consistently update its local dfsclients array and
+ * client's filesBeingWritten map.
+ */
void putFileBeingWritten(final String src, final DFSOutputStream out) {
synchronized(filesBeingWritten) {
filesBeingWritten.put(src, out);
@@ -486,7 +571,7 @@ public class DFSClient implements java.i
}
}
- /** Remove a file. */
+ /** Remove a file. Only called from LeaseRenewer. */
void removeFileBeingWritten(final String src) {
synchronized(filesBeingWritten) {
filesBeingWritten.remove(src);
@@ -517,7 +602,7 @@ public class DFSClient implements java.i
if (filesBeingWritten.isEmpty()) {
return;
}
- lastLeaseRenewal = System.currentTimeMillis();
+ lastLeaseRenewal = Time.now();
}
}
@@ -534,7 +619,7 @@ public class DFSClient implements java.i
return true;
} catch (IOException e) {
// Abort if the lease has already expired.
- final long elapsed = System.currentTimeMillis() - getLastLeaseRenewal();
+ final long elapsed = Time.now() - getLastLeaseRenewal();
if (elapsed > HdfsConstants.LEASE_SOFTLIMIT_PERIOD) {
LOG.warn("Failed to renew lease for " + clientName + " for "
+ (elapsed/1000) + " seconds (>= soft-limit ="
@@ -561,7 +646,14 @@ public class DFSClient implements java.i
void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
- socketCache.clear();
+
+ try {
+ // remove reference to this client and stop the renewer,
+ // if there is no more clients under the renewer.
+ getLeaseRenewer().closeClient(this);
+ } catch (IOException ioe) {
+ LOG.info("Exception occurred while aborting the client. " + ioe);
+ }
closeConnectionToNamenode();
}
@@ -596,18 +688,29 @@ public class DFSClient implements java.i
* Close the file system, abandoning all of the leases and files being
* created and close connections to the namenode.
*/
+ @Override
public synchronized void close() throws IOException {
if(clientRunning) {
closeAllFilesBeingWritten(false);
- socketCache.clear();
clientRunning = false;
- leaserenewer.closeClient(this);
+ getLeaseRenewer().closeClient(this);
// close connections to the namenode
closeConnectionToNamenode();
}
}
/**
+ * Close all open streams, abandoning all of the leases and files being
+ * created.
+ * @param abort whether streams should be gracefully closed
+ */
+ public void closeOutputStreams(boolean abort) {
+ if (clientRunning) {
+ closeAllFilesBeingWritten(abort);
+ }
+ }
+
+ /**
* Get the default block size for this cluster
* @return the default block size in bytes
*/
@@ -632,7 +735,7 @@ public class DFSClient implements java.i
* @see ClientProtocol#getServerDefaults()
*/
public FsServerDefaults getServerDefaults() throws IOException {
- long now = System.currentTimeMillis();
+ long now = Time.now();
if (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD) {
serverDefaults = namenode.getServerDefaults();
serverDefaultsLastUpdate = now;
@@ -690,12 +793,12 @@ public class DFSClient implements java.i
*/
static BlockReader getLocalBlockReader(Configuration conf,
String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
- DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
- throws InvalidToken, IOException {
+ DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
+ boolean connectToDnViaHostname) throws InvalidToken, IOException {
try {
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- - offsetIntoBlock);
+ - offsetIntoBlock, connectToDnViaHostname);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
@@ -891,7 +994,81 @@ public class DFSClient implements java.i
public BlockLocation[] getBlockLocations(String src, long start,
long length) throws IOException, UnresolvedLinkException {
LocatedBlocks blocks = getLocatedBlocks(src, start, length);
- return DFSUtil.locatedBlocks2Locations(blocks);
+ BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
+ HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
+ for (int i = 0; i < locations.length; i++) {
+ hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
+ }
+ return hdfsLocations;
+ }
+
+ /**
+ * Get block location information about a list of {@link HdfsBlockLocation}.
+ * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
+ * get {@link BlockStorageLocation}s for blocks returned by
+ * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
+ * .
+ *
+ * This is done by making a round of RPCs to the associated datanodes, asking
+ * the volume of each block replica. The returned array of
+ * {@link BlockStorageLocation} expose this information as a
+ * {@link VolumeId}.
+ *
+ * @param blockLocations
+ * target blocks on which to query volume location information
+ * @return volumeBlockLocations original block array augmented with additional
+ * volume location information for each replica.
+ */
+ public BlockStorageLocation[] getBlockStorageLocations(
+ List<BlockLocation> blockLocations) throws IOException,
+ UnsupportedOperationException, InvalidBlockTokenException {
+ if (!getConf().getHdfsBlocksMetadataEnabled) {
+ throw new UnsupportedOperationException("Datanode-side support for " +
+ "getVolumeBlockLocations() must also be enabled in the client " +
+ "configuration.");
+ }
+ // Downcast blockLocations and fetch out required LocatedBlock(s)
+ List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
+ for (BlockLocation loc : blockLocations) {
+ if (!(loc instanceof HdfsBlockLocation)) {
+ throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
+ "expected to be passed HdfsBlockLocations");
+ }
+ HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
+ blocks.add(hdfsLoc.getLocatedBlock());
+ }
+
+ // Re-group the LocatedBlocks to be grouped by datanodes, with the values
+ // a list of the LocatedBlocks on the datanode.
+ Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks =
+ new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
+ for (LocatedBlock b : blocks) {
+ for (DatanodeInfo info : b.getLocations()) {
+ if (!datanodeBlocks.containsKey(info)) {
+ datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
+ }
+ List<LocatedBlock> l = datanodeBlocks.get(info);
+ l.add(b);
+ }
+ }
+
+ // Make RPCs to the datanodes to get volume locations for its replicas
+ List<HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil
+ .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
+ getConf().getFileBlockStorageLocationsNumThreads,
+ getConf().getFileBlockStorageLocationsTimeout,
+ getConf().connectToDnViaHostname);
+
+ // Regroup the returned VolumeId metadata to again be grouped by
+ // LocatedBlock rather than by datanode
+ Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
+ .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas);
+
+ // Combine original BlockLocations with new VolumeId information
+ BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
+ .convertToVolumeBlockLocations(blocks, blockVolumeIds);
+
+ return volumeBlockLocations;
}
public DFSInputStream open(String src)
@@ -1002,12 +1179,13 @@ public class DFSClient implements java.i
return create(src, FsPermission.getDefault(),
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
- buffersize);
+ buffersize, null);
}
/**
* Call {@link #create(String, FsPermission, EnumSet, boolean, short,
- * long, Progressable, int)} with <code>createParent</code> set to true.
+ * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
+ * set to true.
*/
public DFSOutputStream create(String src,
FsPermission permission,
@@ -1015,10 +1193,11 @@ public class DFSClient implements java.i
short replication,
long blockSize,
Progressable progress,
- int buffersize)
+ int buffersize,
+ ChecksumOpt checksumOpt)
throws IOException {
return create(src, permission, flag, true,
- replication, blockSize, progress, buffersize);
+ replication, blockSize, progress, buffersize, checksumOpt);
}
/**
@@ -1036,6 +1215,7 @@ public class DFSClient implements java.i
* @param blockSize maximum block size
* @param progress interface for reporting client progress
* @param buffersize underlying buffer size
+ * @param checksumOpt checksum options
*
* @return output stream
*
@@ -1049,8 +1229,8 @@ public class DFSClient implements java.i
short replication,
long blockSize,
Progressable progress,
- int buffersize)
- throws IOException {
+ int buffersize,
+ ChecksumOpt checksumOpt) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
@@ -1061,8 +1241,8 @@ public class DFSClient implements java.i
}
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
- buffersize, dfsClientConf.createChecksum());
- leaserenewer.put(src, result, this);
+ buffersize, dfsClientConf.createChecksum(checksumOpt));
+ beginFileLease(src, result);
return result;
}
@@ -1099,20 +1279,18 @@ public class DFSClient implements java.i
long blockSize,
Progressable progress,
int buffersize,
- int bytesPerChecksum)
+ ChecksumOpt checksumOpt)
throws IOException, UnresolvedLinkException {
checkOpen();
CreateFlag.validate(flag);
DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
if (result == null) {
- DataChecksum checksum = DataChecksum.newDataChecksum(
- dfsClientConf.checksumType,
- bytesPerChecksum);
+ DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
flag, createParent, replication, blockSize, progress, buffersize,
checksum);
}
- leaserenewer.put(src, result, this);
+ beginFileLease(src, result);
return result;
}
@@ -1198,7 +1376,7 @@ public class DFSClient implements java.i
+ src + " on client " + clientName);
}
final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
- leaserenewer.put(src, result, this);
+ beginFileLease(src, result);
return result;
}
@@ -1385,7 +1563,45 @@ public class DFSClient implements java.i
*/
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen();
- return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);
+ return getFileChecksum(src, namenode, socketFactory,
+ dfsClientConf.socketTimeout, getDataEncryptionKey(),
+ dfsClientConf.connectToDnViaHostname);
+ }
+
+ @InterfaceAudience.Private
+ public void clearDataEncryptionKey() {
+ LOG.debug("Clearing encryption key");
+ synchronized (this) {
+ encryptionKey = null;
+ }
+ }
+
+ /**
+ * @return true if data sent between this client and DNs should be encrypted,
+ * false otherwise.
+ * @throws IOException in the event of error communicating with the NN
+ */
+ boolean shouldEncryptData() throws IOException {
+ FsServerDefaults d = getServerDefaults();
+ return d == null ? false : d.getEncryptDataTransfer();
+ }
+
+ @InterfaceAudience.Private
+ public DataEncryptionKey getDataEncryptionKey()
+ throws IOException {
+ if (shouldEncryptData()) {
+ synchronized (this) {
+ if (encryptionKey == null ||
+ (encryptionKey != null &&
+ encryptionKey.expiryDate < Time.now())) {
+ LOG.debug("Getting new encryption token from NN");
+ encryptionKey = namenode.getDataEncryptionKey();
+ }
+ return encryptionKey;
+ }
+ } else {
+ return null;
+ }
}
/**
@@ -1394,8 +1610,9 @@ public class DFSClient implements java.i
* @return The checksum
*/
public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
- ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
- ) throws IOException {
+ ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+ DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+ throws IOException {
//get all block locations
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
if (null == blockLocations) {
@@ -1403,7 +1620,8 @@ public class DFSClient implements java.i
}
List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
final DataOutputBuffer md5out = new DataOutputBuffer();
- int bytesPerCRC = 0;
+ int bytesPerCRC = -1;
+ DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
long crcPerBlock = 0;
boolean refetchBlocks = false;
int lastRetriedIndex = -1;
@@ -1433,15 +1651,25 @@ public class DFSClient implements java.i
try {
//connect to a datanode
sock = socketFactory.createSocket();
- NetUtils.connect(sock,
- NetUtils.createSocketAddr(datanodes[j].getXferAddr()),
- timeout);
+ String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to datanode " + dnAddr);
+ }
+ NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout);
- out = new DataOutputStream(
- new BufferedOutputStream(NetUtils.getOutputStream(sock),
- HdfsConstants.SMALL_BUFFER_SIZE));
- in = new DataInputStream(NetUtils.getInputStream(sock));
+ OutputStream unbufOut = NetUtils.getOutputStream(sock);
+ InputStream unbufIn = NetUtils.getInputStream(sock);
+ if (encryptionKey != null) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(
+ unbufOut, unbufIn, encryptionKey);
+ unbufOut = encryptedStreams.out;
+ unbufIn = encryptedStreams.in;
+ }
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsConstants.SMALL_BUFFER_SIZE));
+ in = new DataInputStream(unbufIn);
if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j] + ": "
@@ -1497,6 +1725,17 @@ public class DFSClient implements java.i
checksumData.getMd5().toByteArray());
md5.write(md5out);
+ // read crc-type
+ final DataChecksum.Type ct = HdfsProtoUtil.
+ fromProto(checksumData.getCrcType());
+ if (i == 0) { // first block
+ crcType = ct;
+ } else if (crcType != DataChecksum.Type.MIXED
+ && crcType != ct) {
+ // if crc types are mixed in a file
+ crcType = DataChecksum.Type.MIXED;
+ }
+
done = true;
if (LOG.isDebugEnabled()) {
@@ -1522,7 +1761,18 @@ public class DFSClient implements java.i
//compute file MD5
final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData());
- return new MD5MD5CRC32FileChecksum(bytesPerCRC, crcPerBlock, fileMD5);
+ switch (crcType) {
+ case CRC32:
+ return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
+ crcPerBlock, fileMD5);
+ case CRC32C:
+ return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
+ crcPerBlock, fileMD5);
+ default:
+ // we should never get here since the validity was checked
+ // when getCrcType() was called above.
+ return null;
+ }
}
/**
@@ -1635,6 +1885,20 @@ public class DFSClient implements java.i
throw re.unwrapRemoteException(AccessControlException.class);
}
}
+
+ /**
+ * Rolls the edit log on the active NameNode.
+ * @return the txid of the new log segment
+ *
+ * @see ClientProtocol#rollEdits()
+ */
+ long rollEdits() throws AccessControlException, IOException {
+ try {
+ return namenode.rollEdits();
+ } catch(RemoteException re) {
+ throw re.unwrapRemoteException(AccessControlException.class);
+ }
+ }
/**
* enable/disable restore failed storage.
@@ -1686,14 +1950,6 @@ public class DFSClient implements java.i
}
/**
- * @see ClientProtocol#distributedUpgradeProgress(HdfsConstants.UpgradeAction)
- */
- public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
- throws IOException {
- return namenode.distributedUpgradeProgress(action);
- }
-
- /**
*/
@Deprecated
public boolean mkdirs(String src) throws IOException {
@@ -1715,34 +1971,29 @@ public class DFSClient implements java.i
*/
public boolean mkdirs(String src, FsPermission permission,
boolean createParent) throws IOException {
- checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
- if(LOG.isDebugEnabled()) {
- LOG.debug(src + ": masked=" + masked);
- }
- try {
- return namenode.mkdirs(src, masked, createParent);
- } catch(RemoteException re) {
- throw re.unwrapRemoteException(AccessControlException.class,
- InvalidPathException.class,
- FileAlreadyExistsException.class,
- FileNotFoundException.class,
- ParentNotDirectoryException.class,
- SafeModeException.class,
- NSQuotaExceededException.class,
- UnresolvedPathException.class);
- }
+ return primitiveMkdir(src, masked, createParent);
}
-
+
/**
* Same {{@link #mkdirs(String, FsPermission, boolean)} except
* that the permissions has already been masked against umask.
*/
public boolean primitiveMkdir(String src, FsPermission absPermission)
throws IOException {
+ return primitiveMkdir(src, absPermission, true);
+ }
+
+ /**
+ * Same {{@link #mkdirs(String, FsPermission, boolean)} except
+ * that the permissions has already been masked against umask.
+ */
+ public boolean primitiveMkdir(String src, FsPermission absPermission,
+ boolean createParent)
+ throws IOException {
checkOpen();
if (absPermission == null) {
absPermission =
@@ -1753,15 +2004,20 @@ public class DFSClient implements java.i
LOG.debug(src + ": masked=" + absPermission);
}
try {
- return namenode.mkdirs(src, absPermission, true);
+ return namenode.mkdirs(src, absPermission, createParent);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
+ InvalidPathException.class,
+ FileAlreadyExistsException.class,
+ FileNotFoundException.class,
+ ParentNotDirectoryException.class,
+ SafeModeException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class,
UnresolvedPathException.class);
}
}
-
+
/**
* Get {@link ContentSummary} rooted at the specified directory.
* @param path The string representation of the path
@@ -1833,10 +2089,7 @@ public class DFSClient implements java.i
}
boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
- if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
- return true;
- }
- return false;
+ return shortCircuitLocalReads && isLocalAddress(targetAddr);
}
void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {