You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/29 22:30:22 UTC
[20/50] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream
and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
new file mode 100644
index 0000000..fb57825
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -0,0 +1,1904 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
+import org.apache.htrace.NullScope;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceInfo;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*********************************************************************
+ *
+ * The DataStreamer class is responsible for sending data packets to the
+ * datanodes in the pipeline. It retrieves a new blockid and block locations
+ * from the namenode, and starts streaming packets to the pipeline of
+ * Datanodes. Every packet has a sequence number associated with
+ * it. When all the packets for a block are sent out and acks for each
+ * if them are received, the DataStreamer closes the current block.
+ *
+ * The DataStreamer thread picks up packets from the dataQueue, sends it to
+ * the first datanode in the pipeline and moves it from the dataQueue to the
+ * ackQueue. The ResponseProcessor receives acks from the datanodes. When an
+ * successful ack for a packet is received from all datanodes, the
+ * ResponseProcessor removes the corresponding packet from the ackQueue.
+ *
+ * In case of error, all outstanding packets are moved from ackQueue. A new
+ * pipeline is setup by eliminating the bad datanode from the original
+ * pipeline. The DataStreamer now starts sending packets from the dataQueue.
+ *
+ *********************************************************************/
+
+@InterfaceAudience.Private
+class DataStreamer extends Daemon {
+ static final Logger LOG = LoggerFactory.getLogger(DataStreamer.class);
+
+ /**
+ * Create a socket for a write pipeline
+ *
+ * @param first the first datanode
+ * @param length the pipeline length
+ * @param client client
+ * @return the socket connected to the first datanode
+ */
+ static Socket createSocketForPipeline(final DatanodeInfo first,
+ final int length, final DFSClient client) throws IOException {
+ final DfsClientConf conf = client.getConf();
+ final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to datanode " + dnAddr);
+ }
+ final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
+ final Socket sock = client.socketFactory.createSocket();
+ final int timeout = client.getDatanodeReadTimeout(length);
+ NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
+ sock.setSoTimeout(timeout);
+ sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Send buf size " + sock.getSendBufferSize());
+ }
+ return sock;
+ }
+
+ /**
+ * if this file is lazy persist
+ *
+ * @param stat the HdfsFileStatus of a file
+ * @return if this file is lazy persist
+ */
+ static boolean isLazyPersist(HdfsFileStatus stat) {
+ return stat.getStoragePolicy() == HdfsConstants.MEMORY_STORAGE_POLICY_ID;
+ }
+
+ /**
+ * release a list of packets to ByteArrayManager
+ *
+ * @param packets packets to be release
+ * @param bam ByteArrayManager
+ */
+ private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
+ for(DFSPacket p : packets) {
+ p.releaseBuffer(bam);
+ }
+ packets.clear();
+ }
+
+ static class LastExceptionInStreamer {
+ private IOException thrown;
+
+ synchronized void set(Throwable t) {
+ assert t != null;
+ this.thrown = t instanceof IOException ?
+ (IOException) t : new IOException(t);
+ }
+
+ synchronized void clear() {
+ thrown = null;
+ }
+
+ /** Check if there already is an exception. */
+ synchronized void check(boolean resetToNull) throws IOException {
+ if (thrown != null) {
+ if (LOG.isTraceEnabled()) {
+ // wrap and print the exception to know when the check is called
+ LOG.trace("Got Exception while checking", new Throwable(thrown));
+ }
+ final IOException e = thrown;
+ if (resetToNull) {
+ thrown = null;
+ }
+ throw e;
+ }
+ }
+
+ synchronized void throwException4Close() throws IOException {
+ check(false);
+ throw new ClosedChannelException();
+ }
+ }
+
+ static class ErrorState {
+ private boolean error = false;
+ private int badNodeIndex = -1;
+ private int restartingNodeIndex = -1;
+ private long restartingNodeDeadline = 0;
+ private final long datanodeRestartTimeout;
+
+ ErrorState(long datanodeRestartTimeout) {
+ this.datanodeRestartTimeout = datanodeRestartTimeout;
+ }
+
+ synchronized void reset() {
+ error = false;
+ badNodeIndex = -1;
+ restartingNodeIndex = -1;
+ restartingNodeDeadline = 0;
+ }
+
+ synchronized boolean hasError() {
+ return error;
+ }
+
+ synchronized boolean hasDatanodeError() {
+ return error && isNodeMarked();
+ }
+
+ synchronized void setError(boolean err) {
+ this.error = err;
+ }
+
+ synchronized void setBadNodeIndex(int index) {
+ this.badNodeIndex = index;
+ }
+
+ synchronized int getBadNodeIndex() {
+ return badNodeIndex;
+ }
+
+ synchronized int getRestartingNodeIndex() {
+ return restartingNodeIndex;
+ }
+
+ synchronized void initRestartingNode(int i, String message) {
+ restartingNodeIndex = i;
+ restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
+ // If the data streamer has already set the primary node
+ // bad, clear it. It is likely that the write failed due to
+ // the DN shutdown. Even if it was a real failure, the pipeline
+ // recovery will take care of it.
+ badNodeIndex = -1;
+ LOG.info(message);
+ }
+
+ synchronized boolean isRestartingNode() {
+ return restartingNodeIndex >= 0;
+ }
+
+ synchronized boolean isNodeMarked() {
+ return badNodeIndex >= 0 || isRestartingNode();
+ }
+
+ /**
+ * This method is used when no explicit error report was received, but
+ * something failed. The first node is a suspect or unsure about the cause
+ * so that it is marked as failed.
+ */
+ synchronized void markFirstNodeIfNotMarked() {
+ // There should be no existing error and no ongoing restart.
+ if (!isNodeMarked()) {
+ badNodeIndex = 0;
+ }
+ }
+
+ synchronized void adjustState4RestartingNode() {
+ // Just took care of a node error while waiting for a node restart
+ if (restartingNodeIndex >= 0) {
+ // If the error came from a node further away than the restarting
+ // node, the restart must have been complete.
+ if (badNodeIndex > restartingNodeIndex) {
+ restartingNodeIndex = -1;
+ } else if (badNodeIndex < restartingNodeIndex) {
+ // the node index has shifted.
+ restartingNodeIndex--;
+ } else {
+ throw new IllegalStateException("badNodeIndex = " + badNodeIndex
+ + " = restartingNodeIndex = " + restartingNodeIndex);
+ }
+ }
+
+ if (!isRestartingNode()) {
+ error = false;
+ }
+ badNodeIndex = -1;
+ }
+
+ synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
+ if (restartingNodeIndex >= 0) {
+ if (!error) {
+ throw new IllegalStateException("error=false while checking" +
+ " restarting node deadline");
+ }
+
+ // check badNodeIndex
+ if (badNodeIndex == restartingNodeIndex) {
+ // ignore, if came from the restarting node
+ badNodeIndex = -1;
+ }
+ // not within the deadline
+ if (Time.monotonicNow() >= restartingNodeDeadline) {
+ // expired. declare the restarting node dead
+ restartingNodeDeadline = 0;
+ final int i = restartingNodeIndex;
+ restartingNodeIndex = -1;
+ LOG.warn("Datanode " + i + " did not restart within "
+ + datanodeRestartTimeout + "ms: " + nodes[i]);
+ // Mark the restarting node as failed. If there is any other failed
+ // node during the last pipeline construction attempt, it will not be
+ // overwritten/dropped. In this case, the restarting node will get
+ // excluded in the following attempt, if it still does not come up.
+ if (badNodeIndex == -1) {
+ badNodeIndex = i;
+ }
+ }
+ }
+ }
+ }
+
+ private volatile boolean streamerClosed = false;
+ private ExtendedBlock block; // its length is number of bytes acked
+ private Token<BlockTokenIdentifier> accessToken;
+ private DataOutputStream blockStream;
+ private DataInputStream blockReplyStream;
+ private ResponseProcessor response = null;
+ private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+ private volatile StorageType[] storageTypes = null;
+ private volatile String[] storageIDs = null;
+ private final ErrorState errorState;
+
+ private BlockConstructionStage stage; // block construction stage
+ private long bytesSent = 0; // number of bytes that've been sent
+ private final boolean isLazyPersistFile;
+
+ /** Nodes have been used in the pipeline before and have failed. */
+ private final List<DatanodeInfo> failed = new ArrayList<>();
+ /** The last ack sequence number before pipeline failure. */
+ private long lastAckedSeqnoBeforeFailure = -1;
+ private int pipelineRecoveryCount = 0;
+ /** Has the current block been hflushed? */
+ private boolean isHflushed = false;
+ /** Append on an existing block? */
+ private final boolean isAppend;
+
+ private long currentSeqno = 0;
+ private long lastQueuedSeqno = -1;
+ private long lastAckedSeqno = -1;
+ private long bytesCurBlock = 0; // bytes written in current block
+ private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
+ private Socket s;
+
+ private final DFSClient dfsClient;
+ private final String src;
+ /** Only for DataTransferProtocol.writeBlock(..) */
+ private final DataChecksum checksum4WriteBlock;
+ private final Progressable progress;
+ private final HdfsFileStatus stat;
+ // appending to existing partial block
+ private volatile boolean appendChunk = false;
+ // both dataQueue and ackQueue are protected by dataQueue lock
+ private final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
+ private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
+ private final AtomicReference<CachingStrategy> cachingStrategy;
+ private final ByteArrayManager byteArrayManager;
+ //persist blocks on namenode
+ private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
+ private boolean failPacket = false;
+ private final long dfsclientSlowLogThresholdMs;
+ private long artificialSlowdown = 0;
+ // List of congested data nodes. The stream will back off if the DataNodes
+ // are congested
+ private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
+ private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
+ private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
+ CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
+ private int lastCongestionBackoffTime;
+
+ private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
+ private final String[] favoredNodes;
+
+ private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
+ Progressable progress, DataChecksum checksum,
+ AtomicReference<CachingStrategy> cachingStrategy,
+ ByteArrayManager byteArrayManage,
+ boolean isAppend, String[] favoredNodes) {
+ this.dfsClient = dfsClient;
+ this.src = src;
+ this.progress = progress;
+ this.stat = stat;
+ this.checksum4WriteBlock = checksum;
+ this.cachingStrategy = cachingStrategy;
+ this.byteArrayManager = byteArrayManage;
+ this.isLazyPersistFile = isLazyPersist(stat);
+ this.isAppend = isAppend;
+ this.favoredNodes = favoredNodes;
+
+ final DfsClientConf conf = dfsClient.getConf();
+ this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
+ this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
+ this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
+ }
+
+ /**
+ * construction with tracing info
+ */
+ DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
+ String src, Progressable progress, DataChecksum checksum,
+ AtomicReference<CachingStrategy> cachingStrategy,
+ ByteArrayManager byteArrayManage, String[] favoredNodes) {
+ this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+ byteArrayManage, false, favoredNodes);
+ this.block = block;
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+ }
+
+ /**
+ * Construct a data streamer for appending to the last partial block
+ * @param lastBlock last block of the file to be appended
+ * @param stat status of the file to be appended
+ * @throws IOException if error occurs
+ */
+ DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient,
+ String src, Progressable progress, DataChecksum checksum,
+ AtomicReference<CachingStrategy> cachingStrategy,
+ ByteArrayManager byteArrayManage) throws IOException {
+ this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+ byteArrayManage, true, null);
+ stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+ block = lastBlock.getBlock();
+ bytesSent = block.getNumBytes();
+ accessToken = lastBlock.getBlockToken();
+ }
+
+ /**
+ * Set pipeline in construction
+ *
+ * @param lastBlock the last block of a file
+ * @throws IOException
+ */
+ void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
+ // setup pipeline to append to the last block XXX retries??
+ setPipeline(lastBlock);
+ if (nodes.length < 1) {
+ throw new IOException("Unable to retrieve blocks locations " +
+ " for last block " + block +
+ "of file " + src);
+ }
+ }
+
+ private void setPipeline(LocatedBlock lb) {
+ setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
+ }
+
+ private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
+ String[] storageIDs) {
+ this.nodes = nodes;
+ this.storageTypes = storageTypes;
+ this.storageIDs = storageIDs;
+ }
+
+ /**
+ * Initialize for data streaming
+ */
+ private void initDataStreaming() {
+ this.setName("DataStreamer for file " + src +
+ " block " + block);
+ response = new ResponseProcessor(nodes);
+ response.start();
+ stage = BlockConstructionStage.DATA_STREAMING;
+ }
+
+ private void endBlock() {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Closing old block " + block);
+ }
+ this.setName("DataStreamer for file " + src);
+ closeResponder();
+ closeStream();
+ setPipeline(null, null, null);
+ stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+ }
+
+ private boolean shouldStop() {
+ return streamerClosed || errorState.hasError() || !dfsClient.clientRunning;
+ }
+
+ /*
+ * streamer thread is the only thread that opens streams to datanode,
+ * and closes them. Any error recovery is also done by this thread.
+ */
+ @Override
+ public void run() {
+ long lastPacket = Time.monotonicNow();
+ TraceScope scope = NullScope.INSTANCE;
+ while (!streamerClosed && dfsClient.clientRunning) {
+ // if the Responder encountered an error, shutdown Responder
+ if (errorState.hasError() && response != null) {
+ try {
+ response.close();
+ response.join();
+ response = null;
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception", e);
+ }
+ }
+
+ DFSPacket one;
+ try {
+ // process datanode IO errors if any
+ boolean doSleep = processDatanodeError();
+
+ final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
+ synchronized (dataQueue) {
+ // wait for a packet to be sent.
+ long now = Time.monotonicNow();
+ while ((!shouldStop() && dataQueue.size() == 0 &&
+ (stage != BlockConstructionStage.DATA_STREAMING ||
+ stage == BlockConstructionStage.DATA_STREAMING &&
+ now - lastPacket < halfSocketTimeout)) || doSleep ) {
+ long timeout = halfSocketTimeout - (now-lastPacket);
+ timeout = timeout <= 0 ? 1000 : timeout;
+ timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+ timeout : 1000;
+ try {
+ dataQueue.wait(timeout);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception", e);
+ }
+ doSleep = false;
+ now = Time.monotonicNow();
+ }
+ if (shouldStop()) {
+ continue;
+ }
+ // get packet to be sent.
+ if (dataQueue.isEmpty()) {
+ one = createHeartbeatPacket();
+ } else {
+ try {
+ backOffIfNecessary();
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception", e);
+ }
+ one = dataQueue.getFirst(); // regular data packet
+ long parents[] = one.getTraceParents();
+ if (parents.length > 0) {
+ scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
+ // TODO: use setParents API once it's available from HTrace 3.2
+ // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
+ // scope.getSpan().setParents(parents);
+ }
+ }
+ }
+
+ // get new block from namenode.
+ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Allocating new block");
+ }
+ setPipeline(nextBlockOutputStream());
+ initDataStreaming();
+ } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Append to block " + block);
+ }
+ setupPipelineForAppendOrRecovery();
+ if (streamerClosed) {
+ continue;
+ }
+ initDataStreaming();
+ }
+
+ long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+ if (lastByteOffsetInBlock > stat.getBlockSize()) {
+ throw new IOException("BlockSize " + stat.getBlockSize() +
+ " is smaller than data size. " +
+ " Offset of packet in block " +
+ lastByteOffsetInBlock +
+ " Aborting file " + src);
+ }
+
+ if (one.isLastPacketInBlock()) {
+ // wait for all data packets have been successfully acked
+ synchronized (dataQueue) {
+ while (!shouldStop() && ackQueue.size() != 0) {
+ try {
+ // wait for acks to arrive from datanodes
+ dataQueue.wait(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception", e);
+ }
+ }
+ }
+ if (shouldStop()) {
+ continue;
+ }
+ stage = BlockConstructionStage.PIPELINE_CLOSE;
+ }
+
+ // send the packet
+ Span span = null;
+ synchronized (dataQueue) {
+ // move packet from dataQueue to ackQueue
+ if (!one.isHeartbeatPacket()) {
+ span = scope.detach();
+ one.setTraceSpan(span);
+ dataQueue.removeFirst();
+ ackQueue.addLast(one);
+ dataQueue.notifyAll();
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DataStreamer block " + block +
+ " sending packet " + one);
+ }
+
+ // write out data to remote datanode
+ TraceScope writeScope = Trace.startSpan("writeTo", span);
+ try {
+ one.writeTo(blockStream);
+ blockStream.flush();
+ } catch (IOException e) {
+ // HDFS-3398 treat primary DN is down since client is unable to
+ // write to primary DN. If a failed or restarting node has already
+ // been recorded by the responder, the following call will have no
+ // effect. Pipeline recovery can handle only one node error at a
+ // time. If the primary node fails again during the recovery, it
+ // will be taken out then.
+ errorState.markFirstNodeIfNotMarked();
+ throw e;
+ } finally {
+ writeScope.close();
+ }
+ lastPacket = Time.monotonicNow();
+
+ // update bytesSent
+ long tmpBytesSent = one.getLastByteOffsetBlock();
+ if (bytesSent < tmpBytesSent) {
+ bytesSent = tmpBytesSent;
+ }
+
+ if (shouldStop()) {
+ continue;
+ }
+
+ // Is this block full?
+ if (one.isLastPacketInBlock()) {
+ // wait for the close packet has been acked
+ synchronized (dataQueue) {
+ while (!shouldStop() && ackQueue.size() != 0) {
+ dataQueue.wait(1000);// wait for acks to arrive from datanodes
+ }
+ }
+ if (shouldStop()) {
+ continue;
+ }
+
+ endBlock();
+ }
+ if (progress != null) { progress.progress(); }
+
+ // This is used by unit test to trigger race conditions.
+ if (artificialSlowdown != 0 && dfsClient.clientRunning) {
+ Thread.sleep(artificialSlowdown);
+ }
+ } catch (Throwable e) {
+ // Log warning if there was a real error.
+ if (!errorState.isRestartingNode()) {
+ // Since their messages are descriptive enough, do not always
+ // log a verbose stack-trace WARN for quota exceptions.
+ if (e instanceof QuotaExceededException) {
+ LOG.debug("DataStreamer Quota Exception", e);
+ } else {
+ LOG.warn("DataStreamer Exception", e);
+ }
+ }
+ lastException.set(e);
+ assert !(e instanceof NullPointerException);
+ errorState.setError(true);
+ if (!errorState.isNodeMarked()) {
+ // Not a datanode issue
+ streamerClosed = true;
+ }
+ } finally {
+ scope.close();
+ }
+ }
+ closeInternal();
+ }
+
+ private void closeInternal() {
+ closeResponder(); // close and join
+ closeStream();
+ streamerClosed = true;
+ release();
+ synchronized (dataQueue) {
+ dataQueue.notifyAll();
+ }
+ }
+
+ /**
+ * release the DFSPackets in the two queues
+ *
+ */
+ void release() {
+ synchronized (dataQueue) {
+ releaseBuffer(dataQueue, byteArrayManager);
+ releaseBuffer(ackQueue, byteArrayManager);
+ }
+ }
+
+ /**
+ * wait for the ack of seqno
+ *
+ * @param seqno the sequence number to be acked
+ * @throws IOException
+ */
+ void waitForAckedSeqno(long seqno) throws IOException {
+ TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for ack for: " + seqno);
+ }
+ long begin = Time.monotonicNow();
+ try {
+ synchronized (dataQueue) {
+ while (!streamerClosed) {
+ checkClosed();
+ if (lastAckedSeqno >= seqno) {
+ break;
+ }
+ try {
+ dataQueue.wait(1000); // when we receive an ack, we notify on
+ // dataQueue
+ } catch (InterruptedException ie) {
+ throw new InterruptedIOException(
+ "Interrupted while waiting for data to be acknowledged by pipeline");
+ }
+ }
+ }
+ checkClosed();
+ } catch (ClosedChannelException e) {
+ }
+ long duration = Time.monotonicNow() - begin;
+ if (duration > dfsclientSlowLogThresholdMs) {
+ LOG.warn("Slow waitForAckedSeqno took " + duration
+ + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
+ }
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * wait for space of dataQueue and queue the packet
+ *
+ * @param packet the DFSPacket to be queued
+ * @throws IOException
+ */
+ void waitAndQueuePacket(DFSPacket packet) throws IOException {
+ synchronized (dataQueue) {
+ try {
+ // If queue is full, then wait till we have enough space
+ boolean firstWait = true;
+ try {
+ while (!streamerClosed && dataQueue.size() + ackQueue.size() >
+ dfsClient.getConf().getWriteMaxPackets()) {
+ if (firstWait) {
+ Span span = Trace.currentSpan();
+ if (span != null) {
+ span.addTimelineAnnotation("dataQueue.wait");
+ }
+ firstWait = false;
+ }
+ try {
+ dataQueue.wait();
+ } catch (InterruptedException e) {
+ // If we get interrupted while waiting to queue data, we still need to get rid
+ // of the current packet. This is because we have an invariant that if
+ // currentPacket gets full, it will get queued before the next writeChunk.
+ //
+ // Rather than wait around for space in the queue, we should instead try to
+ // return to the caller as soon as possible, even though we slightly overrun
+ // the MAX_PACKETS length.
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ } finally {
+ Span span = Trace.currentSpan();
+ if ((span != null) && (!firstWait)) {
+ span.addTimelineAnnotation("end.wait");
+ }
+ }
+ checkClosed();
+ queuePacket(packet);
+ } catch (ClosedChannelException e) {
+ }
+ }
+ }
+
+ /*
+ * close the streamer, should be called only by an external thread
+ * and only after all data to be sent has been flushed to datanode.
+ *
+ * Interrupt this data streamer if force is true
+ *
+ * @param force if this data stream is forced to be closed
+ */
+ void close(boolean force) {
+ streamerClosed = true;
+ synchronized (dataQueue) {
+ dataQueue.notifyAll();
+ }
+ if (force) {
+ this.interrupt();
+ }
+ }
+
+
+ private void checkClosed() throws IOException {
+ if (streamerClosed) {
+ lastException.throwException4Close();
+ }
+ }
+
+ private void closeResponder() {
+ if (response != null) {
+ try {
+ response.close();
+ response.join();
+ } catch (InterruptedException e) {
+ LOG.warn("Caught exception", e);
+ } finally {
+ response = null;
+ }
+ }
+ }
+
+ private void closeStream() {
+ final MultipleIOException.Builder b = new MultipleIOException.Builder();
+
+ if (blockStream != null) {
+ try {
+ blockStream.close();
+ } catch (IOException e) {
+ b.add(e);
+ } finally {
+ blockStream = null;
+ }
+ }
+ if (blockReplyStream != null) {
+ try {
+ blockReplyStream.close();
+ } catch (IOException e) {
+ b.add(e);
+ } finally {
+ blockReplyStream = null;
+ }
+ }
+ if (null != s) {
+ try {
+ s.close();
+ } catch (IOException e) {
+ b.add(e);
+ } finally {
+ s = null;
+ }
+ }
+
+ final IOException ioe = b.build();
+ if (ioe != null) {
+ lastException.set(ioe);
+ }
+ }
+
+ /**
+ * Examine whether it is worth waiting for a node to restart.
+ * @param index the node index
+ */
+ boolean shouldWaitForRestart(int index) {
+ // Only one node in the pipeline.
+ if (nodes.length == 1) {
+ return true;
+ }
+
+ // Is it a local node?
+ InetAddress addr = null;
+ try {
+ addr = InetAddress.getByName(nodes[index].getIpAddr());
+ } catch (java.net.UnknownHostException e) {
+ // we are passing an ip address. this should not happen.
+ assert false;
+ }
+
+ if (addr != null && NetUtils.isLocalAddress(addr)) {
+ return true;
+ }
+ return false;
+ }
+
+ //
+ // Processes responses from the datanodes. A packet is removed
+ // from the ackQueue when its response arrives.
+ //
+ private class ResponseProcessor extends Daemon {
+
+ private volatile boolean responderClosed = false;
+ private DatanodeInfo[] targets = null;
+ private boolean isLastPacketInBlock = false;
+
+ ResponseProcessor (DatanodeInfo[] targets) {
+ this.targets = targets;
+ }
+
+ @Override
+ public void run() {
+
+ setName("ResponseProcessor for block " + block);
+ PipelineAck ack = new PipelineAck();
+
+ TraceScope scope = NullScope.INSTANCE;
+ while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
+ // process responses from datanodes.
+ try {
+ // read an ack from the pipeline
+ long begin = Time.monotonicNow();
+ ack.readFields(blockReplyStream);
+ long duration = Time.monotonicNow() - begin;
+ if (duration > dfsclientSlowLogThresholdMs
+ && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
+ LOG.warn("Slow ReadProcessor read fields took " + duration
+ + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
+ + ack + ", targets: " + Arrays.asList(targets));
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("DFSClient " + ack);
+ }
+
+ long seqno = ack.getSeqno();
+ // processes response status from datanodes.
+ ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
+ for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
+ final Status reply = PipelineAck.getStatusFromHeader(ack
+ .getHeaderFlag(i));
+ if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
+ PipelineAck.ECN.CONGESTED) {
+ congestedNodesFromAck.add(targets[i]);
+ }
+ // Restart will not be treated differently unless it is
+ // the local node or the only one in the pipeline.
+ if (PipelineAck.isRestartOOBStatus(reply) &&
+ shouldWaitForRestart(i)) {
+ final String message = "Datanode " + i + " is restarting: "
+ + targets[i];
+ errorState.initRestartingNode(i, message);
+ throw new IOException(message);
+ }
+ // node error
+ if (reply != SUCCESS) {
+ errorState.setBadNodeIndex(i); // mark bad datanode
+ throw new IOException("Bad response " + reply +
+ " for " + block + " from datanode " + targets[i]);
+ }
+ }
+
+ if (!congestedNodesFromAck.isEmpty()) {
+ synchronized (congestedNodes) {
+ congestedNodes.clear();
+ congestedNodes.addAll(congestedNodesFromAck);
+ }
+ } else {
+ synchronized (congestedNodes) {
+ congestedNodes.clear();
+ lastCongestionBackoffTime = 0;
+ }
+ }
+
+ assert seqno != PipelineAck.UNKOWN_SEQNO :
+ "Ack for unknown seqno should be a failed ack: " + ack;
+ if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
+ continue;
+ }
+
+ // a success ack for a data packet
+ DFSPacket one;
+ synchronized (dataQueue) {
+ one = ackQueue.getFirst();
+ }
+ if (one.getSeqno() != seqno) {
+ throw new IOException("ResponseProcessor: Expecting seqno " +
+ " for block " + block +
+ one.getSeqno() + " but received " + seqno);
+ }
+ isLastPacketInBlock = one.isLastPacketInBlock();
+
+ // Fail the packet write for testing in order to force a
+ // pipeline recovery.
+ if (DFSClientFaultInjector.get().failPacket() &&
+ isLastPacketInBlock) {
+ failPacket = true;
+ throw new IOException(
+ "Failing the last packet for testing.");
+ }
+
+ // update bytesAcked
+ block.setNumBytes(one.getLastByteOffsetBlock());
+
+ synchronized (dataQueue) {
+ scope = Trace.continueSpan(one.getTraceSpan());
+ one.setTraceSpan(null);
+ lastAckedSeqno = seqno;
+ ackQueue.removeFirst();
+ dataQueue.notifyAll();
+
+ one.releaseBuffer(byteArrayManager);
+ }
+ } catch (Exception e) {
+ if (!responderClosed) {
+ lastException.set(e);
+ errorState.setError(true);
+ errorState.markFirstNodeIfNotMarked();
+ synchronized (dataQueue) {
+ dataQueue.notifyAll();
+ }
+ if (!errorState.isRestartingNode()) {
+ LOG.warn("Exception for " + block, e);
+ }
+ responderClosed = true;
+ }
+ } finally {
+ scope.close();
+ }
+ }
+ }
+
+ void close() {
+ responderClosed = true;
+ this.interrupt();
+ }
+ }
+
+ /**
+ * If this stream has encountered any errors, shutdown threads
+ * and mark the stream as closed.
+ *
+ * @return true if it should sleep for a while after returning.
+ */
+ private boolean processDatanodeError() throws IOException {
+ if (!errorState.hasDatanodeError()) {
+ return false;
+ }
+ if (response != null) {
+ LOG.info("Error Recovery for " + block +
+ " waiting for responder to exit. ");
+ return true;
+ }
+ closeStream();
+
+ // move packets from ack queue to front of the data queue
+ synchronized (dataQueue) {
+ dataQueue.addAll(0, ackQueue);
+ ackQueue.clear();
+ }
+
+ // Record the new pipeline failure recovery.
+ if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
+ lastAckedSeqnoBeforeFailure = lastAckedSeqno;
+ pipelineRecoveryCount = 1;
+ } else {
+ // If we had to recover the pipeline five times in a row for the
+ // same packet, this client likely has corrupt data or corrupting
+ // during transmission.
+ if (++pipelineRecoveryCount > 5) {
+ LOG.warn("Error recovering pipeline for writing " +
+ block + ". Already retried 5 times for the same packet.");
+ lastException.set(new IOException("Failing write. Tried pipeline " +
+ "recovery 5 times without success."));
+ streamerClosed = true;
+ return false;
+ }
+ }
+ boolean doSleep = setupPipelineForAppendOrRecovery();
+
+ if (!streamerClosed && dfsClient.clientRunning) {
+ if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+
+ // If we had an error while closing the pipeline, we go through a fast-path
+ // where the BlockReceiver does not run. Instead, the DataNode just finalizes
+ // the block immediately during the 'connect ack' process. So, we want to pull
+ // the end-of-block packet from the dataQueue, since we don't actually have
+ // a true pipeline to send it over.
+ //
+ // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
+ // a client waiting on close() will be aware that the flush finished.
+ synchronized (dataQueue) {
+ DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
+ Span span = endOfBlockPacket.getTraceSpan();
+ if (span != null) {
+ // Close any trace span associated with this Packet
+ TraceScope scope = Trace.continueSpan(span);
+ scope.close();
+ }
+ assert endOfBlockPacket.isLastPacketInBlock();
+ assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
+ lastAckedSeqno = endOfBlockPacket.getSeqno();
+ dataQueue.notifyAll();
+ }
+ endBlock();
+ } else {
+ initDataStreaming();
+ }
+ }
+
+ return doSleep;
+ }
+
+ void setHflush() {
+ isHflushed = true;
+ }
+
+ private int findNewDatanode(final DatanodeInfo[] original
+ ) throws IOException {
+ if (nodes.length != original.length + 1) {
+ throw new IOException(
+ new StringBuilder()
+ .append("Failed to replace a bad datanode on the existing pipeline ")
+ .append("due to no more good datanodes being available to try. ")
+ .append("(Nodes: current=").append(Arrays.asList(nodes))
+ .append(", original=").append(Arrays.asList(original)).append("). ")
+ .append("The current failed datanode replacement policy is ")
+ .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
+ .append("a client may configure this via '")
+ .append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
+ .append("' in its configuration.")
+ .toString());
+ }
+ for(int i = 0; i < nodes.length; i++) {
+ int j = 0;
+ for(; j < original.length && !nodes[i].equals(original[j]); j++);
+ if (j == original.length) {
+ return i;
+ }
+ }
+ throw new IOException("Failed: new datanode not found: nodes="
+ + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+ }
+
+ private void addDatanode2ExistingPipeline() throws IOException {
+ if (DataTransferProtocol.LOG.isDebugEnabled()) {
+ DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
+ }
+ /*
+ * Is data transfer necessary? We have the following cases.
+ *
+ * Case 1: Failure in Pipeline Setup
+ * - Append
+ * + Transfer the stored replica, which may be a RBW or a finalized.
+ * - Create
+ * + If no data, then no transfer is required.
+ * + If there are data written, transfer RBW. This case may happens
+ * when there are streaming failure earlier in this pipeline.
+ *
+ * Case 2: Failure in Streaming
+ * - Append/Create:
+ * + transfer RBW
+ *
+ * Case 3: Failure in Close
+ * - Append/Create:
+ * + no transfer, let NameNode replicates the block.
+ */
+ if (!isAppend && lastAckedSeqno < 0
+ && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+ //no data have been written
+ return;
+ } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
+ || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ //pipeline is closing
+ return;
+ }
+
+ //get a new datanode
+ final DatanodeInfo[] original = nodes;
+ final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
+ src, stat.getFileId(), block, nodes, storageIDs,
+ failed.toArray(new DatanodeInfo[failed.size()]),
+ 1, dfsClient.clientName);
+ setPipeline(lb);
+
+ //find the new datanode
+ final int d = findNewDatanode(original);
+
+ //transfer replica
+ final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
+ final DatanodeInfo[] targets = {nodes[d]};
+ final StorageType[] targetStorageTypes = {storageTypes[d]};
+ transfer(src, targets, targetStorageTypes, lb.getBlockToken());
+ }
+
+ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
+ final Token<BlockTokenIdentifier> blockToken) throws IOException {
+ //transfer replica to the new datanode
+ Socket sock = null;
+ DataOutputStream out = null;
+ DataInputStream in = null;
+ try {
+ sock = createSocketForPipeline(src, 2, dfsClient);
+ final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
+ final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
+
+ OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
+ IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
+ unbufOut, unbufIn, dfsClient, blockToken, src);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
+ in = new DataInputStream(unbufIn);
+
+ //send the TRANSFER_BLOCK request
+ new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
+ targets, targetStorageTypes);
+ out.flush();
+
+ //ack
+ BlockOpResponseProto response =
+ BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
+ if (SUCCESS != response.getStatus()) {
+ throw new IOException("Failed to add a datanode");
+ }
+ } finally {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ IOUtils.closeSocket(sock);
+ }
+ }
+
+ /**
+ * Open a DataStreamer to a DataNode pipeline so that
+ * it can be written to.
+ * This happens when a file is appended or data streaming fails
+ * It keeps on trying until a pipeline is setup
+ */
+ private boolean setupPipelineForAppendOrRecovery() throws IOException {
+ // check number of datanodes
+ if (nodes == null || nodes.length == 0) {
+ String msg = "Could not get block locations. " + "Source file \""
+ + src + "\" - Aborting...";
+ LOG.warn(msg);
+ lastException.set(new IOException(msg));
+ streamerClosed = true;
+ return false;
+ }
+
+ boolean success = false;
+ long newGS = 0L;
+ while (!success && !streamerClosed && dfsClient.clientRunning) {
+ if (!handleRestartingDatanode()) {
+ return false;
+ }
+
+ final boolean isRecovery = errorState.hasError();
+ if (!handleBadDatanode()) {
+ return false;
+ }
+
+ handleDatanodeReplacement();
+
+ // get a new generation stamp and an access token
+ final LocatedBlock lb = updateBlockForPipeline();
+ newGS = lb.getBlock().getGenerationStamp();
+ accessToken = lb.getBlockToken();
+
+ // set up the pipeline again with the remaining nodes
+ success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
+
+ failPacket4Testing();
+
+ errorState.checkRestartingNodeDeadline(nodes);
+ } // while
+
+ if (success) {
+ block = updatePipeline(newGS);
+ }
+ return false; // do not sleep, continue processing
+ }
+
+ /**
+ * Sleep if a node is restarting.
+ * This process is repeated until the deadline or the node starts back up.
+ * @return true if it should continue.
+ */
+ private boolean handleRestartingDatanode() {
+ if (errorState.isRestartingNode()) {
+ // 4 seconds or the configured deadline period, whichever is shorter.
+ // This is the retry interval and recovery will be retried in this
+ // interval until timeout or success.
+ final long delay = Math.min(errorState.datanodeRestartTimeout, 4000L);
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ie) {
+ lastException.set(new IOException(
+ "Interrupted while waiting for restarting "
+ + nodes[errorState.getRestartingNodeIndex()]));
+ streamerClosed = true;
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Remove bad node from list of nodes if badNodeIndex was set.
+ * @return true if it should continue.
+ */
+ private boolean handleBadDatanode() {
+ final int badNodeIndex = errorState.getBadNodeIndex();
+ if (badNodeIndex >= 0) {
+ if (nodes.length <= 1) {
+ lastException.set(new IOException("All datanodes "
+ + Arrays.toString(nodes) + " are bad. Aborting..."));
+ streamerClosed = true;
+ return false;
+ }
+
+ LOG.warn("Error Recovery for " + block + " in pipeline "
+ + Arrays.toString(nodes) + ": datanode " + badNodeIndex
+ + "("+ nodes[badNodeIndex] + ") is bad.");
+ failed.add(nodes[badNodeIndex]);
+
+ DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
+ arraycopy(nodes, newnodes, badNodeIndex);
+
+ final StorageType[] newStorageTypes = new StorageType[newnodes.length];
+ arraycopy(storageTypes, newStorageTypes, badNodeIndex);
+
+ final String[] newStorageIDs = new String[newnodes.length];
+ arraycopy(storageIDs, newStorageIDs, badNodeIndex);
+
+ setPipeline(newnodes, newStorageTypes, newStorageIDs);
+
+ errorState.adjustState4RestartingNode();
+ lastException.clear();
+ }
+ return true;
+ }
+
+ /** Add a datanode if replace-datanode policy is satisfied. */
+ private void handleDatanodeReplacement() throws IOException {
+ if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
+ nodes, isAppend, isHflushed)) {
+ try {
+ addDatanode2ExistingPipeline();
+ } catch(IOException ioe) {
+ if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
+ throw ioe;
+ }
+ LOG.warn("Failed to replace datanode."
+ + " Continue with the remaining datanodes since "
+ + BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
+ + " is set to true.", ioe);
+ }
+ }
+ }
+
+ private void failPacket4Testing() {
+ if (failPacket) { // for testing
+ failPacket = false;
+ try {
+ // Give DNs time to send in bad reports. In real situations,
+ // good reports should follow bad ones, if client committed
+ // with those nodes.
+ Thread.sleep(2000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+
+ LocatedBlock updateBlockForPipeline() throws IOException {
+ return dfsClient.namenode.updateBlockForPipeline(
+ block, dfsClient.clientName);
+ }
+
+ /** update pipeline at the namenode */
+ ExtendedBlock updatePipeline(long newGS) throws IOException {
+ final ExtendedBlock newBlock = new ExtendedBlock(
+ block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
+ dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
+ nodes, storageIDs);
+ return newBlock;
+ }
+
+ /**
+ * Open a DataStreamer to a DataNode so that it can be written to.
+ * This happens when a file is created and each time a new block is allocated.
+ * Must get block ID and the IDs of the destinations from the namenode.
+ * Returns the list of target datanodes.
+ */
+ private LocatedBlock nextBlockOutputStream() throws IOException {
+ LocatedBlock lb = null;
+ DatanodeInfo[] nodes = null;
+ StorageType[] storageTypes = null;
+ int count = dfsClient.getConf().getNumBlockWriteRetry();
+ boolean success = false;
+ ExtendedBlock oldBlock = block;
+ do {
+ errorState.reset();
+ lastException.clear();
+ success = false;
+
+ DatanodeInfo[] excluded =
+ excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
+ .keySet()
+ .toArray(new DatanodeInfo[0]);
+ block = oldBlock;
+ lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
+ block = lb.getBlock();
+ block.setNumBytes(0);
+ bytesSent = 0;
+ accessToken = lb.getBlockToken();
+ nodes = lb.getLocations();
+ storageTypes = lb.getStorageTypes();
+
+ //
+ // Connect to first DataNode in the list.
+ //
+ success = createBlockOutputStream(nodes, storageTypes, 0L, false);
+
+ if (!success) {
+ LOG.info("Abandoning " + block);
+ dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
+ dfsClient.clientName);
+ block = null;
+ final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
+ LOG.info("Excluding datanode " + badNode);
+ excludedNodes.put(badNode, badNode);
+ }
+ } while (!success && --count >= 0);
+
+ if (!success) {
+ throw new IOException("Unable to create new block.");
+ }
+ return lb;
+ }
+
+ // connects to the first datanode in the pipeline
+ // Returns true if success, otherwise return failure.
+ //
+ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
+ StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
+ if (nodes.length == 0) {
+ LOG.info("nodes are empty for write pipeline of " + block);
+ return false;
+ }
+ Status pipelineStatus = SUCCESS;
+ String firstBadLink = "";
+ boolean checkRestart = false;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("pipeline = " + Arrays.asList(nodes));
+ }
+
+ // persist blocks on namenode on next flush
+ persistBlocks.set(true);
+
+ int refetchEncryptionKey = 1;
+ while (true) {
+ boolean result = false;
+ DataOutputStream out = null;
+ try {
+ assert null == s : "Previous socket unclosed";
+ assert null == blockReplyStream : "Previous blockReplyStream unclosed";
+ s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
+ long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
+ long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);
+
+ OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
+ IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
+ unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
+ blockReplyStream = new DataInputStream(unbufIn);
+
+ //
+ // Xmit header info to datanode
+ //
+
+ BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
+
+ // We cannot change the block length in 'block' as it counts the number
+ // of bytes ack'ed.
+ ExtendedBlock blockCopy = new ExtendedBlock(block);
+ blockCopy.setNumBytes(stat.getBlockSize());
+
+ boolean[] targetPinnings = getPinnings(nodes, true);
+ // send the request
+ new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
+ dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
+ nodes.length, block.getNumBytes(), bytesSent, newGS,
+ checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
+ (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
+
+ // receive ack for connect
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(blockReplyStream));
+ pipelineStatus = resp.getStatus();
+ firstBadLink = resp.getFirstBadLink();
+
+ // Got an restart OOB ack.
+ // If a node is already restarting, this status is not likely from
+ // the same node. If it is from a different node, it is not
+ // from the local datanode. Thus it is safe to treat this as a
+ // regular node error.
+ if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
+ !errorState.isRestartingNode()) {
+ checkRestart = true;
+ throw new IOException("A datanode is restarting.");
+ }
+
+ String logInfo = "ack with firstBadLink as " + firstBadLink;
+ DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
+
+ assert null == blockStream : "Previous blockStream unclosed";
+ blockStream = out;
+ result = true; // success
+ errorState.reset();
+ } catch (IOException ie) {
+ if (!errorState.isRestartingNode()) {
+ LOG.info("Exception in createBlockOutputStream", ie);
+ }
+ if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to "
+ + nodes[0] + " : " + ie);
+ // The encryption key used is invalid.
+ refetchEncryptionKey--;
+ dfsClient.clearDataEncryptionKey();
+ // Don't close the socket/exclude this node just yet. Try again with
+ // a new encryption key.
+ continue;
+ }
+
+ // find the datanode that matches
+ if (firstBadLink.length() != 0) {
+ for (int i = 0; i < nodes.length; i++) {
+ // NB: Unconditionally using the xfer addr w/o hostname
+ if (firstBadLink.equals(nodes[i].getXferAddr())) {
+ errorState.setBadNodeIndex(i);
+ break;
+ }
+ }
+ } else {
+ assert checkRestart == false;
+ errorState.setBadNodeIndex(0);
+ }
+
+ final int i = errorState.getBadNodeIndex();
+ // Check whether there is a restart worth waiting for.
+ if (checkRestart && shouldWaitForRestart(i)) {
+ errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]);
+ }
+ errorState.setError(true);
+ lastException.set(ie);
+ result = false; // error
+ } finally {
+ if (!result) {
+ IOUtils.closeSocket(s);
+ s = null;
+ IOUtils.closeStream(out);
+ out = null;
+ IOUtils.closeStream(blockReplyStream);
+ blockReplyStream = null;
+ }
+ }
+ return result;
+ }
+ }
+
+ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
+ if (favoredNodes == null) {
+ return null;
+ } else {
+ boolean[] pinnings = new boolean[nodes.length];
+ HashSet<String> favoredSet =
+ new HashSet<String>(Arrays.asList(favoredNodes));
+ for (int i = 0; i < nodes.length; i++) {
+ pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(nodes[i].getXferAddrWithHostname() +
+ " was chosen by name node (favored=" + pinnings[i] + ").");
+ }
+ }
+ if (shouldLog && !favoredSet.isEmpty()) {
+ // There is one or more favored nodes that were not allocated.
+ LOG.warn("These favored nodes were specified but not chosen: "
+ + favoredSet + " Specified favored nodes: "
+ + Arrays.toString(favoredNodes));
+
+ }
+ return pinnings;
+ }
+ }
+
+ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+ throws IOException {
+ final DfsClientConf conf = dfsClient.getConf();
+ int retries = conf.getNumBlockWriteLocateFollowingRetry();
+ long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
+ while (true) {
+ long localstart = Time.monotonicNow();
+ while (true) {
+ try {
+ return dfsClient.namenode.addBlock(src, dfsClient.clientName,
+ block, excludedNodes, stat.getFileId(), favoredNodes);
+ } catch (RemoteException e) {
+ IOException ue =
+ e.unwrapRemoteException(FileNotFoundException.class,
+ AccessControlException.class,
+ NSQuotaExceededException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ UnresolvedPathException.class);
+ if (ue != e) {
+ throw ue; // no need to retry these exceptions
+ }
+
+
+ if (NotReplicatedYetException.class.getName().
+ equals(e.getClassName())) {
+ if (retries == 0) {
+ throw e;
+ } else {
+ --retries;
+ LOG.info("Exception while adding a block", e);
+ long elapsed = Time.monotonicNow() - localstart;
+ if (elapsed > 5000) {
+ LOG.info("Waiting for replication for "
+ + (elapsed / 1000) + " seconds");
+ }
+ try {
+ LOG.warn("NotReplicatedYetException sleeping " + src
+ + " retries left " + retries);
+ Thread.sleep(sleeptime);
+ sleeptime *= 2;
+ } catch (InterruptedException ie) {
+ LOG.warn("Caught exception", ie);
+ }
+ }
+ } else {
+ throw e;
+ }
+
+ }
+ }
+ }
+ }
+
+ /**
+ * This function sleeps for a certain amount of time when the writing
+ * pipeline is congested. The function calculates the time based on a
+ * decorrelated filter.
+ *
+ * @see
+ * <a href="http://www.awsarchitectureblog.com/2015/03/backoff.html">
+ * http://www.awsarchitectureblog.com/2015/03/backoff.html</a>
+ */
+ private void backOffIfNecessary() throws InterruptedException {
+ int t = 0;
+ synchronized (congestedNodes) {
+ if (!congestedNodes.isEmpty()) {
+ StringBuilder sb = new StringBuilder("DataNode");
+ for (DatanodeInfo i : congestedNodes) {
+ sb.append(' ').append(i);
+ }
+ int range = Math.abs(lastCongestionBackoffTime * 3 -
+ CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
+ int base = Math.min(lastCongestionBackoffTime * 3,
+ CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
+ t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS,
+ (int)(base + Math.random() * range));
+ lastCongestionBackoffTime = t;
+ sb.append(" are congested. Backing off for ").append(t).append(" ms");
+ LOG.info(sb.toString());
+ congestedNodes.clear();
+ }
+ }
+ if (t != 0) {
+ Thread.sleep(t);
+ }
+ }
+
+ /**
+ * get the block this streamer is writing to
+ *
+ * @return the block this streamer is writing to
+ */
+ ExtendedBlock getBlock() {
+ return block;
+ }
+
+ /**
+ * return the target datanodes in the pipeline
+ *
+ * @return the target datanodes in the pipeline
+ */
+ DatanodeInfo[] getNodes() {
+ return nodes;
+ }
+
+ /**
+ * return the token of the block
+ *
+ * @return the token of the block
+ */
+ Token<BlockTokenIdentifier> getBlockToken() {
+ return accessToken;
+ }
+
+ /**
+ * Put a packet to the data queue
+ *
+ * @param packet the packet to be put into the data queued
+ */
+ void queuePacket(DFSPacket packet) {
+ synchronized (dataQueue) {
+ if (packet == null) return;
+ packet.addTraceParent(Trace.currentSpan());
+ dataQueue.addLast(packet);
+ lastQueuedSeqno = packet.getSeqno();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queued packet " + packet.getSeqno());
+ }
+ dataQueue.notifyAll();
+ }
+ }
+
+ /**
+ * For heartbeat packets, create buffer directly by new byte[]
+ * since heartbeats should not be blocked.
+ */
+ private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
+ final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
+ return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false);
+ }
+
+ private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes(
+ long excludedNodesCacheExpiry) {
+ return CacheBuilder.newBuilder()
+ .expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
+ .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
+ LOG.info("Removing node " + notification.getKey()
+ + " from the excluded nodes list");
+ }
+ }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
+ @Override
+ public DatanodeInfo load(DatanodeInfo key) throws Exception {
+ return key;
+ }
+ });
+ }
+
+ private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
+ System.arraycopy(srcs, 0, dsts, 0, skipIndex);
+ System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
+ }
+
+ /**
+ * check if to persist blocks on namenode
+ *
+ * @return if to persist blocks on namenode
+ */
+ AtomicBoolean getPersistBlocks(){
+ return persistBlocks;
+ }
+
+ /**
+ * check if to append a chunk
+ *
+ * @param appendChunk if to append a chunk
+ */
+ void setAppendChunk(boolean appendChunk){
+ this.appendChunk = appendChunk;
+ }
+
+ /**
+ * get if to append a chunk
+ *
+ * @return if to append a chunk
+ */
+ boolean getAppendChunk(){
+ return appendChunk;
+ }
+
+ /**
+ * @return the last exception
+ */
+ LastExceptionInStreamer getLastException(){
+ return lastException;
+ }
+
+ /**
+ * set socket to null
+ */
+ void setSocketToNull() {
+ this.s = null;
+ }
+
+ /**
+ * return current sequence number and then increase it by 1
+ *
+ * @return current sequence number before increasing
+ */
+ long getAndIncCurrentSeqno() {
+ long old = this.currentSeqno;
+ this.currentSeqno++;
+ return old;
+ }
+
+ /**
+ * get last queued sequence number
+ *
+ * @return last queued sequence number
+ */
+ long getLastQueuedSeqno() {
+ return lastQueuedSeqno;
+ }
+
+ /**
+ * get the number of bytes of current block
+ *
+ * @return the number of bytes of current block
+ */
+ long getBytesCurBlock() {
+ return bytesCurBlock;
+ }
+
+ /**
+ * set the bytes of current block that have been written
+ *
+ * @param bytesCurBlock bytes of current block that have been written
+ */
+ void setBytesCurBlock(long bytesCurBlock) {
+ this.bytesCurBlock = bytesCurBlock;
+ }
+
+ /**
+ * increase bytes of current block by len.
+ *
+ * @param len how many bytes to increase to current block
+ */
+ void incBytesCurBlock(long len) {
+ this.bytesCurBlock += len;
+ }
+
+ /**
+ * set artificial slow down for unit test
+ *
+ * @param period artificial slow down
+ */
+ void setArtificialSlowdown(long period) {
+ this.artificialSlowdown = period;
+ }
+
+ /**
+ * if this streamer is to terminate
+ *
+ * @return if this streamer is to terminate
+ */
+ boolean streamerClosed(){
+ return streamerClosed;
+ }
+
+ void closeSocket() throws IOException {
+ if (s != null) {
+ s.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return (block == null? null: block.getLocalBlock())
+ + "@" + Arrays.toString(getNodes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsConfigurationLoader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsConfigurationLoader.java
new file mode 100644
index 0000000..4a84f06
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsConfigurationLoader.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Load default HDFS configuration resources.
+ */
+@InterfaceAudience.Private
+class HdfsConfigurationLoader {
+
+ static {
+ // adds the default resources
+ Configuration.addDefaultResource("hdfs-default.xml");
+ Configuration.addDefaultResource("hdfs-site.xml");
+ }
+
+ /**
+ * This method is here so that when invoked, default resources are added if
+ * they haven't already been previously loaded. Upon loading this class, the
+ * static initializer block above will be executed to add the default
+ * resources. It is safe for this method to be called multiple times
+ * as the static initializer block will only get invoked once.
+ */
+ public static void init() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
new file mode 100644
index 0000000..f03e179
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+@InterfaceAudience.Private
+public interface RemotePeerFactory {
+ /**
+ * @param addr The address to connect to.
+ * @param blockToken Token used during optional SASL negotiation
+ * @param datanodeId ID of destination DataNode
+ * @return A new Peer connected to the address.
+ *
+ * @throws IOException If there was an error connecting or creating
+ * the remote socket, encrypted stream, etc.
+ */
+ Peer newConnectedPeer(InetSocketAddress addr,
+ Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
new file mode 100644
index 0000000..ec17bb8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Thrown when an unknown cipher suite is encountered.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class UnknownCipherSuiteException extends IOException {
+ public UnknownCipherSuiteException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
new file mode 100644
index 0000000..0aac8c8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class UnknownCryptoProtocolVersionException extends IOException {
+ private static final long serialVersionUID = 8957192l;
+
+ public UnknownCryptoProtocolVersionException() {
+ super();
+ }
+
+ public UnknownCryptoProtocolVersionException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
new file mode 100644
index 0000000..2655c40
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttr.NameSpace;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@InterfaceAudience.Private
+public class XAttrHelper {
+
+ /**
+ * Build <code>XAttr</code> from xattr name with prefix.
+ */
+ public static XAttr buildXAttr(String name) {
+ return buildXAttr(name, null);
+ }
+
+ /**
+ * Build <code>XAttr</code> from name with prefix and value.
+ * Name can not be null. Value can be null. The name and prefix
+ * are validated.
+ * Both name and namespace are case sensitive.
+ */
+ public static XAttr buildXAttr(String name, byte[] value) {
+ Preconditions.checkNotNull(name, "XAttr name cannot be null.");
+
+ final int prefixIndex = name.indexOf(".");
+ if (prefixIndex < 3) {// Prefix length is at least 3.
+ throw new HadoopIllegalArgumentException("An XAttr name must be " +
+ "prefixed with user/trusted/security/system/raw, followed by a '.'");
+ } else if (prefixIndex == name.length() - 1) {
+ throw new HadoopIllegalArgumentException("XAttr name cannot be empty.");
+ }
+
+ NameSpace ns;
+ final String prefix = name.substring(0, prefixIndex);
+ if (StringUtils.equalsIgnoreCase(prefix, NameSpace.USER.toString())) {
+ ns = NameSpace.USER;
+ } else if (
+ StringUtils.equalsIgnoreCase(prefix, NameSpace.TRUSTED.toString())) {
+ ns = NameSpace.TRUSTED;
+ } else if (
+ StringUtils.equalsIgnoreCase(prefix, NameSpace.SYSTEM.toString())) {
+ ns = NameSpace.SYSTEM;
+ } else if (
+ StringUtils.equalsIgnoreCase(prefix, NameSpace.SECURITY.toString())) {
+ ns = NameSpace.SECURITY;
+ } else if (
+ StringUtils.equalsIgnoreCase(prefix, NameSpace.RAW.toString())) {
+ ns = NameSpace.RAW;
+ } else {
+ throw new HadoopIllegalArgumentException("An XAttr name must be " +
+ "prefixed with user/trusted/security/system/raw, followed by a '.'");
+ }
+ XAttr xAttr = (new XAttr.Builder()).setNameSpace(ns).setName(name.
+ substring(prefixIndex + 1)).setValue(value).build();
+
+ return xAttr;
+ }
+
+ /**
+ * Build xattr name with prefix as <code>XAttr</code> list.
+ */
+ public static List<XAttr> buildXAttrAsList(String name) {
+ XAttr xAttr = buildXAttr(name);
+ List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
+ xAttrs.add(xAttr);
+
+ return xAttrs;
+ }
+
+ /**
+ * Get value of first xattr from <code>XAttr</code> list
+ */
+ public static byte[] getFirstXAttrValue(List<XAttr> xAttrs) {
+ byte[] value = null;
+ XAttr xAttr = getFirstXAttr(xAttrs);
+ if (xAttr != null) {
+ value = xAttr.getValue();
+ if (value == null) {
+ value = new byte[0]; // xattr exists, but no value.
+ }
+ }
+ return value;
+ }
+
+ /**
+ * Get first xattr from <code>XAttr</code> list
+ */
+ public static XAttr getFirstXAttr(List<XAttr> xAttrs) {
+ if (xAttrs != null && !xAttrs.isEmpty()) {
+ return xAttrs.get(0);
+ }
+
+ return null;
+ }
+
+ /**
+ * Build xattr map from <code>XAttr</code> list, the key is
+ * xattr name with prefix, and value is xattr value.
+ */
+ public static Map<String, byte[]> buildXAttrMap(List<XAttr> xAttrs) {
+ if (xAttrs == null) {
+ return null;
+ }
+ Map<String, byte[]> xAttrMap = Maps.newHashMap();
+ for (XAttr xAttr : xAttrs) {
+ String name = getPrefixedName(xAttr);
+ byte[] value = xAttr.getValue();
+ if (value == null) {
+ value = new byte[0];
+ }
+ xAttrMap.put(name, value);
+ }
+
+ return xAttrMap;
+ }
+
+ /**
+ * Get name with prefix from <code>XAttr</code>
+ */
+ public static String getPrefixedName(XAttr xAttr) {
+ if (xAttr == null) {
+ return null;
+ }
+
+ return getPrefixedName(xAttr.getNameSpace(), xAttr.getName());
+ }
+
+ public static String getPrefixedName(XAttr.NameSpace ns, String name) {
+ return StringUtils.toLowerCase(ns.toString()) + "." + name;
+ }
+
+ /**
+ * Build <code>XAttr</code> list from xattr name list.
+ */
+ public static List<XAttr> buildXAttrs(List<String> names) {
+ if (names == null || names.isEmpty()) {
+ throw new HadoopIllegalArgumentException("XAttr names can not be " +
+ "null or empty.");
+ }
+
+ List<XAttr> xAttrs = Lists.newArrayListWithCapacity(names.size());
+ for (String name : names) {
+ xAttrs.add(buildXAttr(name, null));
+ }
+ return xAttrs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
new file mode 100644
index 0000000..e8ac686
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.crypto.CryptoInputStream;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The Hdfs implementation of {@link FSDataInputStream}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HdfsDataInputStream extends FSDataInputStream {
+ public HdfsDataInputStream(DFSInputStream in) throws IOException {
+ super(in);
+ }
+
+ public HdfsDataInputStream(CryptoInputStream in) throws IOException {
+ super(in);
+ Preconditions.checkArgument(in.getWrappedStream() instanceof DFSInputStream,
+ "CryptoInputStream should wrap a DFSInputStream");
+ }
+
+ private DFSInputStream getDFSInputStream() {
+ if (in instanceof CryptoInputStream) {
+ return (DFSInputStream) ((CryptoInputStream) in).getWrappedStream();
+ }
+ return (DFSInputStream) in;
+ }
+
+ /**
+ * Get a reference to the wrapped output stream. We always want to return the
+ * actual underlying InputStream, even when we're using a CryptoStream. e.g.
+ * in the delegated methods below.
+ *
+ * @return the underlying output stream
+ */
+ public InputStream getWrappedStream() {
+ return in;
+ }
+
+ /**
+ * Get the datanode from which the stream is currently reading.
+ */
+ public DatanodeInfo getCurrentDatanode() {
+ return getDFSInputStream().getCurrentDatanode();
+ }
+
+ /**
+ * Get the block containing the target position.
+ */
+ public ExtendedBlock getCurrentBlock() {
+ return getDFSInputStream().getCurrentBlock();
+ }
+
+ /**
+ * Get the collection of blocks that has already been located.
+ */
+ public List<LocatedBlock> getAllBlocks() throws IOException {
+ return getDFSInputStream().getAllBlocks();
+ }
+
+ /**
+ * Get the visible length of the file. It will include the length of the last
+ * block even if that is in UnderConstruction state.
+ *
+ * @return The visible length of the file.
+ */
+ public long getVisibleLength() throws IOException {
+ return getDFSInputStream().getFileLength();
+ }
+
+ /**
+ * Get statistics about the reads which this DFSInputStream has done.
+ * Note that because HdfsDataInputStream is buffered, these stats may
+ * be higher than you would expect just by adding up the number of
+ * bytes read through HdfsDataInputStream.
+ */
+ public DFSInputStream.ReadStatistics getReadStatistics() {
+ return getDFSInputStream().getReadStatistics();
+ }
+
+ public void clearReadStatistics() {
+ getDFSInputStream().clearReadStatistics();
+ }
+}