You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:02 UTC
[02/58] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream
and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
deleted file mode 100644
index 4a016bd..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ /dev/null
@@ -1,1903 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Time;
-import org.apache.htrace.NullScope;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceInfo;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
-/*********************************************************************
- *
- * The DataStreamer class is responsible for sending data packets to the
- * datanodes in the pipeline. It retrieves a new blockid and block locations
- * from the namenode, and starts streaming packets to the pipeline of
- * Datanodes. Every packet has a sequence number associated with
- * it. When all the packets for a block are sent out and acks for each
- * if them are received, the DataStreamer closes the current block.
- *
- * The DataStreamer thread picks up packets from the dataQueue, sends it to
- * the first datanode in the pipeline and moves it from the dataQueue to the
- * ackQueue. The ResponseProcessor receives acks from the datanodes. When an
- * successful ack for a packet is received from all datanodes, the
- * ResponseProcessor removes the corresponding packet from the ackQueue.
- *
- * In case of error, all outstanding packets are moved from ackQueue. A new
- * pipeline is setup by eliminating the bad datanode from the original
- * pipeline. The DataStreamer now starts sending packets from the dataQueue.
- *
- *********************************************************************/
-
-@InterfaceAudience.Private
-class DataStreamer extends Daemon {
- static final Log LOG = LogFactory.getLog(DataStreamer.class);
-
- /**
- * Create a socket for a write pipeline
- *
- * @param first the first datanode
- * @param length the pipeline length
- * @param client client
- * @return the socket connected to the first datanode
- */
- static Socket createSocketForPipeline(final DatanodeInfo first,
- final int length, final DFSClient client) throws IOException {
- final DfsClientConf conf = client.getConf();
- final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to datanode " + dnAddr);
- }
- final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
- final Socket sock = client.socketFactory.createSocket();
- final int timeout = client.getDatanodeReadTimeout(length);
- NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
- sock.setSoTimeout(timeout);
- sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Send buf size " + sock.getSendBufferSize());
- }
- return sock;
- }
-
- /**
- * if this file is lazy persist
- *
- * @param stat the HdfsFileStatus of a file
- * @return if this file is lazy persist
- */
- static boolean isLazyPersist(HdfsFileStatus stat) {
- return stat.getStoragePolicy() == HdfsConstants.MEMORY_STORAGE_POLICY_ID;
- }
-
- /**
- * release a list of packets to ByteArrayManager
- *
- * @param packets packets to be release
- * @param bam ByteArrayManager
- */
- private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
- for(DFSPacket p : packets) {
- p.releaseBuffer(bam);
- }
- packets.clear();
- }
-
- static class LastExceptionInStreamer {
- private IOException thrown;
-
- synchronized void set(Throwable t) {
- assert t != null;
- this.thrown = t instanceof IOException ?
- (IOException) t : new IOException(t);
- }
-
- synchronized void clear() {
- thrown = null;
- }
-
- /** Check if there already is an exception. */
- synchronized void check(boolean resetToNull) throws IOException {
- if (thrown != null) {
- if (LOG.isTraceEnabled()) {
- // wrap and print the exception to know when the check is called
- LOG.trace("Got Exception while checking", new Throwable(thrown));
- }
- final IOException e = thrown;
- if (resetToNull) {
- thrown = null;
- }
- throw e;
- }
- }
-
- synchronized void throwException4Close() throws IOException {
- check(false);
- throw new ClosedChannelException();
- }
- }
-
- static class ErrorState {
- private boolean error = false;
- private int badNodeIndex = -1;
- private int restartingNodeIndex = -1;
- private long restartingNodeDeadline = 0;
- private final long datanodeRestartTimeout;
-
- ErrorState(long datanodeRestartTimeout) {
- this.datanodeRestartTimeout = datanodeRestartTimeout;
- }
-
- synchronized void reset() {
- error = false;
- badNodeIndex = -1;
- restartingNodeIndex = -1;
- restartingNodeDeadline = 0;
- }
-
- synchronized boolean hasError() {
- return error;
- }
-
- synchronized boolean hasDatanodeError() {
- return error && isNodeMarked();
- }
-
- synchronized void setError(boolean err) {
- this.error = err;
- }
-
- synchronized void setBadNodeIndex(int index) {
- this.badNodeIndex = index;
- }
-
- synchronized int getBadNodeIndex() {
- return badNodeIndex;
- }
-
- synchronized int getRestartingNodeIndex() {
- return restartingNodeIndex;
- }
-
- synchronized void initRestartingNode(int i, String message) {
- restartingNodeIndex = i;
- restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
- // If the data streamer has already set the primary node
- // bad, clear it. It is likely that the write failed due to
- // the DN shutdown. Even if it was a real failure, the pipeline
- // recovery will take care of it.
- badNodeIndex = -1;
- LOG.info(message);
- }
-
- synchronized boolean isRestartingNode() {
- return restartingNodeIndex >= 0;
- }
-
- synchronized boolean isNodeMarked() {
- return badNodeIndex >= 0 || isRestartingNode();
- }
-
- /**
- * This method is used when no explicit error report was received, but
- * something failed. The first node is a suspect or unsure about the cause
- * so that it is marked as failed.
- */
- synchronized void markFirstNodeIfNotMarked() {
- // There should be no existing error and no ongoing restart.
- if (!isNodeMarked()) {
- badNodeIndex = 0;
- }
- }
-
- synchronized void adjustState4RestartingNode() {
- // Just took care of a node error while waiting for a node restart
- if (restartingNodeIndex >= 0) {
- // If the error came from a node further away than the restarting
- // node, the restart must have been complete.
- if (badNodeIndex > restartingNodeIndex) {
- restartingNodeIndex = -1;
- } else if (badNodeIndex < restartingNodeIndex) {
- // the node index has shifted.
- restartingNodeIndex--;
- } else {
- throw new IllegalStateException("badNodeIndex = " + badNodeIndex
- + " = restartingNodeIndex = " + restartingNodeIndex);
- }
- }
-
- if (!isRestartingNode()) {
- error = false;
- }
- badNodeIndex = -1;
- }
-
- synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) {
- if (restartingNodeIndex >= 0) {
- if (!error) {
- throw new IllegalStateException("error=false while checking" +
- " restarting node deadline");
- }
-
- // check badNodeIndex
- if (badNodeIndex == restartingNodeIndex) {
- // ignore, if came from the restarting node
- badNodeIndex = -1;
- }
- // not within the deadline
- if (Time.monotonicNow() >= restartingNodeDeadline) {
- // expired. declare the restarting node dead
- restartingNodeDeadline = 0;
- final int i = restartingNodeIndex;
- restartingNodeIndex = -1;
- LOG.warn("Datanode " + i + " did not restart within "
- + datanodeRestartTimeout + "ms: " + nodes[i]);
- // Mark the restarting node as failed. If there is any other failed
- // node during the last pipeline construction attempt, it will not be
- // overwritten/dropped. In this case, the restarting node will get
- // excluded in the following attempt, if it still does not come up.
- if (badNodeIndex == -1) {
- badNodeIndex = i;
- }
- }
- }
- }
- }
-
- private volatile boolean streamerClosed = false;
- private ExtendedBlock block; // its length is number of bytes acked
- private Token<BlockTokenIdentifier> accessToken;
- private DataOutputStream blockStream;
- private DataInputStream blockReplyStream;
- private ResponseProcessor response = null;
- private volatile DatanodeInfo[] nodes = null; // list of targets for current block
- private volatile StorageType[] storageTypes = null;
- private volatile String[] storageIDs = null;
- private final ErrorState errorState;
-
- private BlockConstructionStage stage; // block construction stage
- private long bytesSent = 0; // number of bytes that've been sent
- private final boolean isLazyPersistFile;
-
- /** Nodes have been used in the pipeline before and have failed. */
- private final List<DatanodeInfo> failed = new ArrayList<>();
- /** The last ack sequence number before pipeline failure. */
- private long lastAckedSeqnoBeforeFailure = -1;
- private int pipelineRecoveryCount = 0;
- /** Has the current block been hflushed? */
- private boolean isHflushed = false;
- /** Append on an existing block? */
- private final boolean isAppend;
-
- private long currentSeqno = 0;
- private long lastQueuedSeqno = -1;
- private long lastAckedSeqno = -1;
- private long bytesCurBlock = 0; // bytes written in current block
- private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
- private Socket s;
-
- private final DFSClient dfsClient;
- private final String src;
- /** Only for DataTransferProtocol.writeBlock(..) */
- private final DataChecksum checksum4WriteBlock;
- private final Progressable progress;
- private final HdfsFileStatus stat;
- // appending to existing partial block
- private volatile boolean appendChunk = false;
- // both dataQueue and ackQueue are protected by dataQueue lock
- private final LinkedList<DFSPacket> dataQueue = new LinkedList<>();
- private final LinkedList<DFSPacket> ackQueue = new LinkedList<>();
- private final AtomicReference<CachingStrategy> cachingStrategy;
- private final ByteArrayManager byteArrayManager;
- //persist blocks on namenode
- private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
- private boolean failPacket = false;
- private final long dfsclientSlowLogThresholdMs;
- private long artificialSlowdown = 0;
- // List of congested data nodes. The stream will back off if the DataNodes
- // are congested
- private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
- private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
- private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
- CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
- private int lastCongestionBackoffTime;
-
- private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
- private final String[] favoredNodes;
-
- private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
- Progressable progress, DataChecksum checksum,
- AtomicReference<CachingStrategy> cachingStrategy,
- ByteArrayManager byteArrayManage,
- boolean isAppend, String[] favoredNodes) {
- this.dfsClient = dfsClient;
- this.src = src;
- this.progress = progress;
- this.stat = stat;
- this.checksum4WriteBlock = checksum;
- this.cachingStrategy = cachingStrategy;
- this.byteArrayManager = byteArrayManage;
- this.isLazyPersistFile = isLazyPersist(stat);
- this.isAppend = isAppend;
- this.favoredNodes = favoredNodes;
-
- final DfsClientConf conf = dfsClient.getConf();
- this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs();
- this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
- this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
- }
-
- /**
- * construction with tracing info
- */
- DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
- String src, Progressable progress, DataChecksum checksum,
- AtomicReference<CachingStrategy> cachingStrategy,
- ByteArrayManager byteArrayManage, String[] favoredNodes) {
- this(stat, dfsClient, src, progress, checksum, cachingStrategy,
- byteArrayManage, false, favoredNodes);
- this.block = block;
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
- }
-
- /**
- * Construct a data streamer for appending to the last partial block
- * @param lastBlock last block of the file to be appended
- * @param stat status of the file to be appended
- * @throws IOException if error occurs
- */
- DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient,
- String src, Progressable progress, DataChecksum checksum,
- AtomicReference<CachingStrategy> cachingStrategy,
- ByteArrayManager byteArrayManage) throws IOException {
- this(stat, dfsClient, src, progress, checksum, cachingStrategy,
- byteArrayManage, true, null);
- stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
- block = lastBlock.getBlock();
- bytesSent = block.getNumBytes();
- accessToken = lastBlock.getBlockToken();
- }
-
- /**
- * Set pipeline in construction
- *
- * @param lastBlock the last block of a file
- * @throws IOException
- */
- void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{
- // setup pipeline to append to the last block XXX retries??
- setPipeline(lastBlock);
- if (nodes.length < 1) {
- throw new IOException("Unable to retrieve blocks locations " +
- " for last block " + block +
- "of file " + src);
- }
- }
-
- private void setPipeline(LocatedBlock lb) {
- setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
- }
-
- private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
- String[] storageIDs) {
- this.nodes = nodes;
- this.storageTypes = storageTypes;
- this.storageIDs = storageIDs;
- }
-
- /**
- * Initialize for data streaming
- */
- private void initDataStreaming() {
- this.setName("DataStreamer for file " + src +
- " block " + block);
- response = new ResponseProcessor(nodes);
- response.start();
- stage = BlockConstructionStage.DATA_STREAMING;
- }
-
- private void endBlock() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Closing old block " + block);
- }
- this.setName("DataStreamer for file " + src);
- closeResponder();
- closeStream();
- setPipeline(null, null, null);
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
- }
-
- private boolean shouldStop() {
- return streamerClosed || errorState.hasError() || !dfsClient.clientRunning;
- }
-
- /*
- * streamer thread is the only thread that opens streams to datanode,
- * and closes them. Any error recovery is also done by this thread.
- */
- @Override
- public void run() {
- long lastPacket = Time.monotonicNow();
- TraceScope scope = NullScope.INSTANCE;
- while (!streamerClosed && dfsClient.clientRunning) {
- // if the Responder encountered an error, shutdown Responder
- if (errorState.hasError() && response != null) {
- try {
- response.close();
- response.join();
- response = null;
- } catch (InterruptedException e) {
- LOG.warn("Caught exception", e);
- }
- }
-
- DFSPacket one;
- try {
- // process datanode IO errors if any
- boolean doSleep = processDatanodeError();
-
- final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
- synchronized (dataQueue) {
- // wait for a packet to be sent.
- long now = Time.monotonicNow();
- while ((!shouldStop() && dataQueue.size() == 0 &&
- (stage != BlockConstructionStage.DATA_STREAMING ||
- stage == BlockConstructionStage.DATA_STREAMING &&
- now - lastPacket < halfSocketTimeout)) || doSleep ) {
- long timeout = halfSocketTimeout - (now-lastPacket);
- timeout = timeout <= 0 ? 1000 : timeout;
- timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
- timeout : 1000;
- try {
- dataQueue.wait(timeout);
- } catch (InterruptedException e) {
- LOG.warn("Caught exception", e);
- }
- doSleep = false;
- now = Time.monotonicNow();
- }
- if (shouldStop()) {
- continue;
- }
- // get packet to be sent.
- if (dataQueue.isEmpty()) {
- one = createHeartbeatPacket();
- } else {
- try {
- backOffIfNecessary();
- } catch (InterruptedException e) {
- LOG.warn("Caught exception", e);
- }
- one = dataQueue.getFirst(); // regular data packet
- long parents[] = one.getTraceParents();
- if (parents.length > 0) {
- scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
- // TODO: use setParents API once it's available from HTrace 3.2
- // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
- // scope.getSpan().setParents(parents);
- }
- }
- }
-
- // get new block from namenode.
- if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Allocating new block");
- }
- setPipeline(nextBlockOutputStream());
- initDataStreaming();
- } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Append to block " + block);
- }
- setupPipelineForAppendOrRecovery();
- if (streamerClosed) {
- continue;
- }
- initDataStreaming();
- }
-
- long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
- if (lastByteOffsetInBlock > stat.getBlockSize()) {
- throw new IOException("BlockSize " + stat.getBlockSize() +
- " is smaller than data size. " +
- " Offset of packet in block " +
- lastByteOffsetInBlock +
- " Aborting file " + src);
- }
-
- if (one.isLastPacketInBlock()) {
- // wait for all data packets have been successfully acked
- synchronized (dataQueue) {
- while (!shouldStop() && ackQueue.size() != 0) {
- try {
- // wait for acks to arrive from datanodes
- dataQueue.wait(1000);
- } catch (InterruptedException e) {
- LOG.warn("Caught exception", e);
- }
- }
- }
- if (shouldStop()) {
- continue;
- }
- stage = BlockConstructionStage.PIPELINE_CLOSE;
- }
-
- // send the packet
- Span span = null;
- synchronized (dataQueue) {
- // move packet from dataQueue to ackQueue
- if (!one.isHeartbeatPacket()) {
- span = scope.detach();
- one.setTraceSpan(span);
- dataQueue.removeFirst();
- ackQueue.addLast(one);
- dataQueue.notifyAll();
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("DataStreamer block " + block +
- " sending packet " + one);
- }
-
- // write out data to remote datanode
- TraceScope writeScope = Trace.startSpan("writeTo", span);
- try {
- one.writeTo(blockStream);
- blockStream.flush();
- } catch (IOException e) {
- // HDFS-3398 treat primary DN is down since client is unable to
- // write to primary DN. If a failed or restarting node has already
- // been recorded by the responder, the following call will have no
- // effect. Pipeline recovery can handle only one node error at a
- // time. If the primary node fails again during the recovery, it
- // will be taken out then.
- errorState.markFirstNodeIfNotMarked();
- throw e;
- } finally {
- writeScope.close();
- }
- lastPacket = Time.monotonicNow();
-
- // update bytesSent
- long tmpBytesSent = one.getLastByteOffsetBlock();
- if (bytesSent < tmpBytesSent) {
- bytesSent = tmpBytesSent;
- }
-
- if (shouldStop()) {
- continue;
- }
-
- // Is this block full?
- if (one.isLastPacketInBlock()) {
- // wait for the close packet has been acked
- synchronized (dataQueue) {
- while (!shouldStop() && ackQueue.size() != 0) {
- dataQueue.wait(1000);// wait for acks to arrive from datanodes
- }
- }
- if (shouldStop()) {
- continue;
- }
-
- endBlock();
- }
- if (progress != null) { progress.progress(); }
-
- // This is used by unit test to trigger race conditions.
- if (artificialSlowdown != 0 && dfsClient.clientRunning) {
- Thread.sleep(artificialSlowdown);
- }
- } catch (Throwable e) {
- // Log warning if there was a real error.
- if (!errorState.isRestartingNode()) {
- // Since their messages are descriptive enough, do not always
- // log a verbose stack-trace WARN for quota exceptions.
- if (e instanceof QuotaExceededException) {
- LOG.debug("DataStreamer Quota Exception", e);
- } else {
- LOG.warn("DataStreamer Exception", e);
- }
- }
- lastException.set(e);
- assert !(e instanceof NullPointerException);
- errorState.setError(true);
- if (!errorState.isNodeMarked()) {
- // Not a datanode issue
- streamerClosed = true;
- }
- } finally {
- scope.close();
- }
- }
- closeInternal();
- }
-
- private void closeInternal() {
- closeResponder(); // close and join
- closeStream();
- streamerClosed = true;
- release();
- synchronized (dataQueue) {
- dataQueue.notifyAll();
- }
- }
-
- /**
- * release the DFSPackets in the two queues
- *
- */
- void release() {
- synchronized (dataQueue) {
- releaseBuffer(dataQueue, byteArrayManager);
- releaseBuffer(ackQueue, byteArrayManager);
- }
- }
-
- /**
- * wait for the ack of seqno
- *
- * @param seqno the sequence number to be acked
- * @throws IOException
- */
- void waitForAckedSeqno(long seqno) throws IOException {
- TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waiting for ack for: " + seqno);
- }
- long begin = Time.monotonicNow();
- try {
- synchronized (dataQueue) {
- while (!streamerClosed) {
- checkClosed();
- if (lastAckedSeqno >= seqno) {
- break;
- }
- try {
- dataQueue.wait(1000); // when we receive an ack, we notify on
- // dataQueue
- } catch (InterruptedException ie) {
- throw new InterruptedIOException(
- "Interrupted while waiting for data to be acknowledged by pipeline");
- }
- }
- }
- checkClosed();
- } catch (ClosedChannelException e) {
- }
- long duration = Time.monotonicNow() - begin;
- if (duration > dfsclientSlowLogThresholdMs) {
- LOG.warn("Slow waitForAckedSeqno took " + duration
- + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
- }
- } finally {
- scope.close();
- }
- }
-
- /**
- * wait for space of dataQueue and queue the packet
- *
- * @param packet the DFSPacket to be queued
- * @throws IOException
- */
- void waitAndQueuePacket(DFSPacket packet) throws IOException {
- synchronized (dataQueue) {
- try {
- // If queue is full, then wait till we have enough space
- boolean firstWait = true;
- try {
- while (!streamerClosed && dataQueue.size() + ackQueue.size() >
- dfsClient.getConf().getWriteMaxPackets()) {
- if (firstWait) {
- Span span = Trace.currentSpan();
- if (span != null) {
- span.addTimelineAnnotation("dataQueue.wait");
- }
- firstWait = false;
- }
- try {
- dataQueue.wait();
- } catch (InterruptedException e) {
- // If we get interrupted while waiting to queue data, we still need to get rid
- // of the current packet. This is because we have an invariant that if
- // currentPacket gets full, it will get queued before the next writeChunk.
- //
- // Rather than wait around for space in the queue, we should instead try to
- // return to the caller as soon as possible, even though we slightly overrun
- // the MAX_PACKETS length.
- Thread.currentThread().interrupt();
- break;
- }
- }
- } finally {
- Span span = Trace.currentSpan();
- if ((span != null) && (!firstWait)) {
- span.addTimelineAnnotation("end.wait");
- }
- }
- checkClosed();
- queuePacket(packet);
- } catch (ClosedChannelException e) {
- }
- }
- }
-
- /*
- * close the streamer, should be called only by an external thread
- * and only after all data to be sent has been flushed to datanode.
- *
- * Interrupt this data streamer if force is true
- *
- * @param force if this data stream is forced to be closed
- */
- void close(boolean force) {
- streamerClosed = true;
- synchronized (dataQueue) {
- dataQueue.notifyAll();
- }
- if (force) {
- this.interrupt();
- }
- }
-
-
- private void checkClosed() throws IOException {
- if (streamerClosed) {
- lastException.throwException4Close();
- }
- }
-
- private void closeResponder() {
- if (response != null) {
- try {
- response.close();
- response.join();
- } catch (InterruptedException e) {
- LOG.warn("Caught exception", e);
- } finally {
- response = null;
- }
- }
- }
-
- private void closeStream() {
- final MultipleIOException.Builder b = new MultipleIOException.Builder();
-
- if (blockStream != null) {
- try {
- blockStream.close();
- } catch (IOException e) {
- b.add(e);
- } finally {
- blockStream = null;
- }
- }
- if (blockReplyStream != null) {
- try {
- blockReplyStream.close();
- } catch (IOException e) {
- b.add(e);
- } finally {
- blockReplyStream = null;
- }
- }
- if (null != s) {
- try {
- s.close();
- } catch (IOException e) {
- b.add(e);
- } finally {
- s = null;
- }
- }
-
- final IOException ioe = b.build();
- if (ioe != null) {
- lastException.set(ioe);
- }
- }
-
- /**
- * Examine whether it is worth waiting for a node to restart.
- * @param index the node index
- */
- boolean shouldWaitForRestart(int index) {
- // Only one node in the pipeline.
- if (nodes.length == 1) {
- return true;
- }
-
- // Is it a local node?
- InetAddress addr = null;
- try {
- addr = InetAddress.getByName(nodes[index].getIpAddr());
- } catch (java.net.UnknownHostException e) {
- // we are passing an ip address. this should not happen.
- assert false;
- }
-
- if (addr != null && NetUtils.isLocalAddress(addr)) {
- return true;
- }
- return false;
- }
-
- //
- // Processes responses from the datanodes. A packet is removed
- // from the ackQueue when its response arrives.
- //
- private class ResponseProcessor extends Daemon {
-
- private volatile boolean responderClosed = false;
- private DatanodeInfo[] targets = null;
- private boolean isLastPacketInBlock = false;
-
- ResponseProcessor (DatanodeInfo[] targets) {
- this.targets = targets;
- }
-
- @Override
- public void run() {
-
- setName("ResponseProcessor for block " + block);
- PipelineAck ack = new PipelineAck();
-
- TraceScope scope = NullScope.INSTANCE;
- while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
- // process responses from datanodes.
- try {
- // read an ack from the pipeline
- long begin = Time.monotonicNow();
- ack.readFields(blockReplyStream);
- long duration = Time.monotonicNow() - begin;
- if (duration > dfsclientSlowLogThresholdMs
- && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
- LOG.warn("Slow ReadProcessor read fields took " + duration
- + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
- + ack + ", targets: " + Arrays.asList(targets));
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("DFSClient " + ack);
- }
-
- long seqno = ack.getSeqno();
- // processes response status from datanodes.
- ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
- for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
- final Status reply = PipelineAck.getStatusFromHeader(ack
- .getHeaderFlag(i));
- if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
- PipelineAck.ECN.CONGESTED) {
- congestedNodesFromAck.add(targets[i]);
- }
- // Restart will not be treated differently unless it is
- // the local node or the only one in the pipeline.
- if (PipelineAck.isRestartOOBStatus(reply) &&
- shouldWaitForRestart(i)) {
- final String message = "Datanode " + i + " is restarting: "
- + targets[i];
- errorState.initRestartingNode(i, message);
- throw new IOException(message);
- }
- // node error
- if (reply != SUCCESS) {
- errorState.setBadNodeIndex(i); // mark bad datanode
- throw new IOException("Bad response " + reply +
- " for " + block + " from datanode " + targets[i]);
- }
- }
-
- if (!congestedNodesFromAck.isEmpty()) {
- synchronized (congestedNodes) {
- congestedNodes.clear();
- congestedNodes.addAll(congestedNodesFromAck);
- }
- } else {
- synchronized (congestedNodes) {
- congestedNodes.clear();
- lastCongestionBackoffTime = 0;
- }
- }
-
- assert seqno != PipelineAck.UNKOWN_SEQNO :
- "Ack for unknown seqno should be a failed ack: " + ack;
- if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
- continue;
- }
-
- // a success ack for a data packet
- DFSPacket one;
- synchronized (dataQueue) {
- one = ackQueue.getFirst();
- }
- if (one.getSeqno() != seqno) {
- throw new IOException("ResponseProcessor: Expecting seqno " +
- " for block " + block +
- one.getSeqno() + " but received " + seqno);
- }
- isLastPacketInBlock = one.isLastPacketInBlock();
-
- // Fail the packet write for testing in order to force a
- // pipeline recovery.
- if (DFSClientFaultInjector.get().failPacket() &&
- isLastPacketInBlock) {
- failPacket = true;
- throw new IOException(
- "Failing the last packet for testing.");
- }
-
- // update bytesAcked
- block.setNumBytes(one.getLastByteOffsetBlock());
-
- synchronized (dataQueue) {
- scope = Trace.continueSpan(one.getTraceSpan());
- one.setTraceSpan(null);
- lastAckedSeqno = seqno;
- ackQueue.removeFirst();
- dataQueue.notifyAll();
-
- one.releaseBuffer(byteArrayManager);
- }
- } catch (Exception e) {
- if (!responderClosed) {
- lastException.set(e);
- errorState.setError(true);
- errorState.markFirstNodeIfNotMarked();
- synchronized (dataQueue) {
- dataQueue.notifyAll();
- }
- if (!errorState.isRestartingNode()) {
- LOG.warn("Exception for " + block, e);
- }
- responderClosed = true;
- }
- } finally {
- scope.close();
- }
- }
- }
-
- void close() {
- responderClosed = true;
- this.interrupt();
- }
- }
-
- /**
- * If this stream has encountered any errors, shutdown threads
- * and mark the stream as closed.
- *
- * @return true if it should sleep for a while after returning.
- */
- private boolean processDatanodeError() throws IOException {
- if (!errorState.hasDatanodeError()) {
- return false;
- }
- if (response != null) {
- LOG.info("Error Recovery for " + block +
- " waiting for responder to exit. ");
- return true;
- }
- closeStream();
-
- // move packets from ack queue to front of the data queue
- synchronized (dataQueue) {
- dataQueue.addAll(0, ackQueue);
- ackQueue.clear();
- }
-
- // Record the new pipeline failure recovery.
- if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
- lastAckedSeqnoBeforeFailure = lastAckedSeqno;
- pipelineRecoveryCount = 1;
- } else {
- // If we had to recover the pipeline five times in a row for the
- // same packet, this client likely has corrupt data or corrupting
- // during transmission.
- if (++pipelineRecoveryCount > 5) {
- LOG.warn("Error recovering pipeline for writing " +
- block + ". Already retried 5 times for the same packet.");
- lastException.set(new IOException("Failing write. Tried pipeline " +
- "recovery 5 times without success."));
- streamerClosed = true;
- return false;
- }
- }
- boolean doSleep = setupPipelineForAppendOrRecovery();
-
- if (!streamerClosed && dfsClient.clientRunning) {
- if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
-
- // If we had an error while closing the pipeline, we go through a fast-path
- // where the BlockReceiver does not run. Instead, the DataNode just finalizes
- // the block immediately during the 'connect ack' process. So, we want to pull
- // the end-of-block packet from the dataQueue, since we don't actually have
- // a true pipeline to send it over.
- //
- // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
- // a client waiting on close() will be aware that the flush finished.
- synchronized (dataQueue) {
- DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet
- Span span = endOfBlockPacket.getTraceSpan();
- if (span != null) {
- // Close any trace span associated with this Packet
- TraceScope scope = Trace.continueSpan(span);
- scope.close();
- }
- assert endOfBlockPacket.isLastPacketInBlock();
- assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
- lastAckedSeqno = endOfBlockPacket.getSeqno();
- dataQueue.notifyAll();
- }
- endBlock();
- } else {
- initDataStreaming();
- }
- }
-
- return doSleep;
- }
-
- void setHflush() {
- isHflushed = true;
- }
-
- private int findNewDatanode(final DatanodeInfo[] original
- ) throws IOException {
- if (nodes.length != original.length + 1) {
- throw new IOException(
- new StringBuilder()
- .append("Failed to replace a bad datanode on the existing pipeline ")
- .append("due to no more good datanodes being available to try. ")
- .append("(Nodes: current=").append(Arrays.asList(nodes))
- .append(", original=").append(Arrays.asList(original)).append("). ")
- .append("The current failed datanode replacement policy is ")
- .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
- .append("a client may configure this via '")
- .append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY)
- .append("' in its configuration.")
- .toString());
- }
- for(int i = 0; i < nodes.length; i++) {
- int j = 0;
- for(; j < original.length && !nodes[i].equals(original[j]); j++);
- if (j == original.length) {
- return i;
- }
- }
- throw new IOException("Failed: new datanode not found: nodes="
- + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
- }
-
- private void addDatanode2ExistingPipeline() throws IOException {
- if (DataTransferProtocol.LOG.isDebugEnabled()) {
- DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
- }
- /*
- * Is data transfer necessary? We have the following cases.
- *
- * Case 1: Failure in Pipeline Setup
- * - Append
- * + Transfer the stored replica, which may be a RBW or a finalized.
- * - Create
- * + If no data, then no transfer is required.
- * + If there are data written, transfer RBW. This case may happens
- * when there are streaming failure earlier in this pipeline.
- *
- * Case 2: Failure in Streaming
- * - Append/Create:
- * + transfer RBW
- *
- * Case 3: Failure in Close
- * - Append/Create:
- * + no transfer, let NameNode replicates the block.
- */
- if (!isAppend && lastAckedSeqno < 0
- && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
- //no data have been written
- return;
- } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
- || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
- //pipeline is closing
- return;
- }
-
- //get a new datanode
- final DatanodeInfo[] original = nodes;
- final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
- src, stat.getFileId(), block, nodes, storageIDs,
- failed.toArray(new DatanodeInfo[failed.size()]),
- 1, dfsClient.clientName);
- setPipeline(lb);
-
- //find the new datanode
- final int d = findNewDatanode(original);
-
- //transfer replica
- final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
- final DatanodeInfo[] targets = {nodes[d]};
- final StorageType[] targetStorageTypes = {storageTypes[d]};
- transfer(src, targets, targetStorageTypes, lb.getBlockToken());
- }
-
- private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
- final StorageType[] targetStorageTypes,
- final Token<BlockTokenIdentifier> blockToken) throws IOException {
- //transfer replica to the new datanode
- Socket sock = null;
- DataOutputStream out = null;
- DataInputStream in = null;
- try {
- sock = createSocketForPipeline(src, 2, dfsClient);
- final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
- final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
-
- OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
- InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
- IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
- unbufOut, unbufIn, dfsClient, blockToken, src);
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
- in = new DataInputStream(unbufIn);
-
- //send the TRANSFER_BLOCK request
- new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
- targets, targetStorageTypes);
- out.flush();
-
- //ack
- BlockOpResponseProto response =
- BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in));
- if (SUCCESS != response.getStatus()) {
- throw new IOException("Failed to add a datanode");
- }
- } finally {
- IOUtils.closeStream(in);
- IOUtils.closeStream(out);
- IOUtils.closeSocket(sock);
- }
- }
-
- /**
- * Open a DataStreamer to a DataNode pipeline so that
- * it can be written to.
- * This happens when a file is appended or data streaming fails
- * It keeps on trying until a pipeline is setup
- */
- private boolean setupPipelineForAppendOrRecovery() throws IOException {
- // check number of datanodes
- if (nodes == null || nodes.length == 0) {
- String msg = "Could not get block locations. " + "Source file \""
- + src + "\" - Aborting...";
- LOG.warn(msg);
- lastException.set(new IOException(msg));
- streamerClosed = true;
- return false;
- }
-
- boolean success = false;
- long newGS = 0L;
- while (!success && !streamerClosed && dfsClient.clientRunning) {
- if (!handleRestartingDatanode()) {
- return false;
- }
-
- final boolean isRecovery = errorState.hasError();
- if (!handleBadDatanode()) {
- return false;
- }
-
- handleDatanodeReplacement();
-
- // get a new generation stamp and an access token
- final LocatedBlock lb = updateBlockForPipeline();
- newGS = lb.getBlock().getGenerationStamp();
- accessToken = lb.getBlockToken();
-
- // set up the pipeline again with the remaining nodes
- success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
-
- failPacket4Testing();
-
- errorState.checkRestartingNodeDeadline(nodes);
- } // while
-
- if (success) {
- block = updatePipeline(newGS);
- }
- return false; // do not sleep, continue processing
- }
-
- /**
- * Sleep if a node is restarting.
- * This process is repeated until the deadline or the node starts back up.
- * @return true if it should continue.
- */
- private boolean handleRestartingDatanode() {
- if (errorState.isRestartingNode()) {
- // 4 seconds or the configured deadline period, whichever is shorter.
- // This is the retry interval and recovery will be retried in this
- // interval until timeout or success.
- final long delay = Math.min(errorState.datanodeRestartTimeout, 4000L);
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ie) {
- lastException.set(new IOException(
- "Interrupted while waiting for restarting "
- + nodes[errorState.getRestartingNodeIndex()]));
- streamerClosed = true;
- return false;
- }
- }
- return true;
- }
-
- /**
- * Remove bad node from list of nodes if badNodeIndex was set.
- * @return true if it should continue.
- */
- private boolean handleBadDatanode() {
- final int badNodeIndex = errorState.getBadNodeIndex();
- if (badNodeIndex >= 0) {
- if (nodes.length <= 1) {
- lastException.set(new IOException("All datanodes "
- + Arrays.toString(nodes) + " are bad. Aborting..."));
- streamerClosed = true;
- return false;
- }
-
- LOG.warn("Error Recovery for " + block + " in pipeline "
- + Arrays.toString(nodes) + ": datanode " + badNodeIndex
- + "("+ nodes[badNodeIndex] + ") is bad.");
- failed.add(nodes[badNodeIndex]);
-
- DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
- arraycopy(nodes, newnodes, badNodeIndex);
-
- final StorageType[] newStorageTypes = new StorageType[newnodes.length];
- arraycopy(storageTypes, newStorageTypes, badNodeIndex);
-
- final String[] newStorageIDs = new String[newnodes.length];
- arraycopy(storageIDs, newStorageIDs, badNodeIndex);
-
- setPipeline(newnodes, newStorageTypes, newStorageIDs);
-
- errorState.adjustState4RestartingNode();
- lastException.clear();
- }
- return true;
- }
-
- /** Add a datanode if replace-datanode policy is satisfied. */
- private void handleDatanodeReplacement() throws IOException {
- if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(),
- nodes, isAppend, isHflushed)) {
- try {
- addDatanode2ExistingPipeline();
- } catch(IOException ioe) {
- if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
- throw ioe;
- }
- LOG.warn("Failed to replace datanode."
- + " Continue with the remaining datanodes since "
- + BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
- + " is set to true.", ioe);
- }
- }
- }
-
- private void failPacket4Testing() {
- if (failPacket) { // for testing
- failPacket = false;
- try {
- // Give DNs time to send in bad reports. In real situations,
- // good reports should follow bad ones, if client committed
- // with those nodes.
- Thread.sleep(2000);
- } catch (InterruptedException ie) {}
- }
- }
-
- LocatedBlock updateBlockForPipeline() throws IOException {
- return dfsClient.namenode.updateBlockForPipeline(
- block, dfsClient.clientName);
- }
-
- /** update pipeline at the namenode */
- ExtendedBlock updatePipeline(long newGS) throws IOException {
- final ExtendedBlock newBlock = new ExtendedBlock(
- block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
- dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
- nodes, storageIDs);
- return newBlock;
- }
-
- /**
- * Open a DataStreamer to a DataNode so that it can be written to.
- * This happens when a file is created and each time a new block is allocated.
- * Must get block ID and the IDs of the destinations from the namenode.
- * Returns the list of target datanodes.
- */
- private LocatedBlock nextBlockOutputStream() throws IOException {
- LocatedBlock lb = null;
- DatanodeInfo[] nodes = null;
- StorageType[] storageTypes = null;
- int count = dfsClient.getConf().getNumBlockWriteRetry();
- boolean success = false;
- ExtendedBlock oldBlock = block;
- do {
- errorState.reset();
- lastException.clear();
- success = false;
-
- DatanodeInfo[] excluded =
- excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
- .keySet()
- .toArray(new DatanodeInfo[0]);
- block = oldBlock;
- lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
- block = lb.getBlock();
- block.setNumBytes(0);
- bytesSent = 0;
- accessToken = lb.getBlockToken();
- nodes = lb.getLocations();
- storageTypes = lb.getStorageTypes();
-
- //
- // Connect to first DataNode in the list.
- //
- success = createBlockOutputStream(nodes, storageTypes, 0L, false);
-
- if (!success) {
- LOG.info("Abandoning " + block);
- dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
- dfsClient.clientName);
- block = null;
- final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
- LOG.info("Excluding datanode " + badNode);
- excludedNodes.put(badNode, badNode);
- }
- } while (!success && --count >= 0);
-
- if (!success) {
- throw new IOException("Unable to create new block.");
- }
- return lb;
- }
-
- // connects to the first datanode in the pipeline
- // Returns true if success, otherwise return failure.
- //
- private boolean createBlockOutputStream(DatanodeInfo[] nodes,
- StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
- if (nodes.length == 0) {
- LOG.info("nodes are empty for write pipeline of " + block);
- return false;
- }
- Status pipelineStatus = SUCCESS;
- String firstBadLink = "";
- boolean checkRestart = false;
- if (LOG.isDebugEnabled()) {
- LOG.debug("pipeline = " + Arrays.asList(nodes));
- }
-
- // persist blocks on namenode on next flush
- persistBlocks.set(true);
-
- int refetchEncryptionKey = 1;
- while (true) {
- boolean result = false;
- DataOutputStream out = null;
- try {
- assert null == s : "Previous socket unclosed";
- assert null == blockReplyStream : "Previous blockReplyStream unclosed";
- s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
- long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
- long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);
-
- OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
- InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
- IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
- unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
- unbufOut = saslStreams.out;
- unbufIn = saslStreams.in;
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- DFSUtil.getSmallBufferSize(dfsClient.getConfiguration())));
- blockReplyStream = new DataInputStream(unbufIn);
-
- //
- // Xmit header info to datanode
- //
-
- BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
-
- // We cannot change the block length in 'block' as it counts the number
- // of bytes ack'ed.
- ExtendedBlock blockCopy = new ExtendedBlock(block);
- blockCopy.setNumBytes(stat.getBlockSize());
-
- boolean[] targetPinnings = getPinnings(nodes, true);
- // send the request
- new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
- dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
- nodes.length, block.getNumBytes(), bytesSent, newGS,
- checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
- (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
-
- // receive ack for connect
- BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
- PBHelperClient.vintPrefixed(blockReplyStream));
- pipelineStatus = resp.getStatus();
- firstBadLink = resp.getFirstBadLink();
-
- // Got an restart OOB ack.
- // If a node is already restarting, this status is not likely from
- // the same node. If it is from a different node, it is not
- // from the local datanode. Thus it is safe to treat this as a
- // regular node error.
- if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
- !errorState.isRestartingNode()) {
- checkRestart = true;
- throw new IOException("A datanode is restarting.");
- }
-
- String logInfo = "ack with firstBadLink as " + firstBadLink;
- DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
-
- assert null == blockStream : "Previous blockStream unclosed";
- blockStream = out;
- result = true; // success
- errorState.reset();
- } catch (IOException ie) {
- if (!errorState.isRestartingNode()) {
- LOG.info("Exception in createBlockOutputStream", ie);
- }
- if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
- LOG.info("Will fetch a new encryption key and retry, "
- + "encryption key was invalid when connecting to "
- + nodes[0] + " : " + ie);
- // The encryption key used is invalid.
- refetchEncryptionKey--;
- dfsClient.clearDataEncryptionKey();
- // Don't close the socket/exclude this node just yet. Try again with
- // a new encryption key.
- continue;
- }
-
- // find the datanode that matches
- if (firstBadLink.length() != 0) {
- for (int i = 0; i < nodes.length; i++) {
- // NB: Unconditionally using the xfer addr w/o hostname
- if (firstBadLink.equals(nodes[i].getXferAddr())) {
- errorState.setBadNodeIndex(i);
- break;
- }
- }
- } else {
- assert checkRestart == false;
- errorState.setBadNodeIndex(0);
- }
-
- final int i = errorState.getBadNodeIndex();
- // Check whether there is a restart worth waiting for.
- if (checkRestart && shouldWaitForRestart(i)) {
- errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]);
- }
- errorState.setError(true);
- lastException.set(ie);
- result = false; // error
- } finally {
- if (!result) {
- IOUtils.closeSocket(s);
- s = null;
- IOUtils.closeStream(out);
- out = null;
- IOUtils.closeStream(blockReplyStream);
- blockReplyStream = null;
- }
- }
- return result;
- }
- }
-
- private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
- if (favoredNodes == null) {
- return null;
- } else {
- boolean[] pinnings = new boolean[nodes.length];
- HashSet<String> favoredSet =
- new HashSet<String>(Arrays.asList(favoredNodes));
- for (int i = 0; i < nodes.length; i++) {
- pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
- if (LOG.isDebugEnabled()) {
- LOG.debug(nodes[i].getXferAddrWithHostname() +
- " was chosen by name node (favored=" + pinnings[i] + ").");
- }
- }
- if (shouldLog && !favoredSet.isEmpty()) {
- // There is one or more favored nodes that were not allocated.
- LOG.warn("These favored nodes were specified but not chosen: "
- + favoredSet + " Specified favored nodes: "
- + Arrays.toString(favoredNodes));
-
- }
- return pinnings;
- }
- }
-
- protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
- throws IOException {
- final DfsClientConf conf = dfsClient.getConf();
- int retries = conf.getNumBlockWriteLocateFollowingRetry();
- long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
- while (true) {
- long localstart = Time.monotonicNow();
- while (true) {
- try {
- return dfsClient.namenode.addBlock(src, dfsClient.clientName,
- block, excludedNodes, stat.getFileId(), favoredNodes);
- } catch (RemoteException e) {
- IOException ue =
- e.unwrapRemoteException(FileNotFoundException.class,
- AccessControlException.class,
- NSQuotaExceededException.class,
- DSQuotaExceededException.class,
- QuotaByStorageTypeExceededException.class,
- UnresolvedPathException.class);
- if (ue != e) {
- throw ue; // no need to retry these exceptions
- }
-
-
- if (NotReplicatedYetException.class.getName().
- equals(e.getClassName())) {
- if (retries == 0) {
- throw e;
- } else {
- --retries;
- LOG.info("Exception while adding a block", e);
- long elapsed = Time.monotonicNow() - localstart;
- if (elapsed > 5000) {
- LOG.info("Waiting for replication for "
- + (elapsed / 1000) + " seconds");
- }
- try {
- LOG.warn("NotReplicatedYetException sleeping " + src
- + " retries left " + retries);
- Thread.sleep(sleeptime);
- sleeptime *= 2;
- } catch (InterruptedException ie) {
- LOG.warn("Caught exception", ie);
- }
- }
- } else {
- throw e;
- }
-
- }
- }
- }
- }
-
- /**
- * This function sleeps for a certain amount of time when the writing
- * pipeline is congested. The function calculates the time based on a
- * decorrelated filter.
- *
- * @see
- * <a href="http://www.awsarchitectureblog.com/2015/03/backoff.html">
- * http://www.awsarchitectureblog.com/2015/03/backoff.html</a>
- */
- private void backOffIfNecessary() throws InterruptedException {
- int t = 0;
- synchronized (congestedNodes) {
- if (!congestedNodes.isEmpty()) {
- StringBuilder sb = new StringBuilder("DataNode");
- for (DatanodeInfo i : congestedNodes) {
- sb.append(' ').append(i);
- }
- int range = Math.abs(lastCongestionBackoffTime * 3 -
- CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
- int base = Math.min(lastCongestionBackoffTime * 3,
- CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
- t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS,
- (int)(base + Math.random() * range));
- lastCongestionBackoffTime = t;
- sb.append(" are congested. Backing off for ").append(t).append(" ms");
- LOG.info(sb.toString());
- congestedNodes.clear();
- }
- }
- if (t != 0) {
- Thread.sleep(t);
- }
- }
-
- /**
- * get the block this streamer is writing to
- *
- * @return the block this streamer is writing to
- */
- ExtendedBlock getBlock() {
- return block;
- }
-
- /**
- * return the target datanodes in the pipeline
- *
- * @return the target datanodes in the pipeline
- */
- DatanodeInfo[] getNodes() {
- return nodes;
- }
-
- /**
- * return the token of the block
- *
- * @return the token of the block
- */
- Token<BlockTokenIdentifier> getBlockToken() {
- return accessToken;
- }
-
- /**
- * Put a packet to the data queue
- *
- * @param packet the packet to be put into the data queued
- */
- void queuePacket(DFSPacket packet) {
- synchronized (dataQueue) {
- if (packet == null) return;
- packet.addTraceParent(Trace.currentSpan());
- dataQueue.addLast(packet);
- lastQueuedSeqno = packet.getSeqno();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Queued packet " + packet.getSeqno());
- }
- dataQueue.notifyAll();
- }
- }
-
- /**
- * For heartbeat packets, create buffer directly by new byte[]
- * since heartbeats should not be blocked.
- */
- private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
- final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
- return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false);
- }
-
- private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes(
- long excludedNodesCacheExpiry) {
- return CacheBuilder.newBuilder()
- .expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS)
- .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
- @Override
- public void onRemoval(
- RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
- LOG.info("Removing node " + notification.getKey()
- + " from the excluded nodes list");
- }
- }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
- @Override
- public DatanodeInfo load(DatanodeInfo key) throws Exception {
- return key;
- }
- });
- }
-
- private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
- System.arraycopy(srcs, 0, dsts, 0, skipIndex);
- System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
- }
-
- /**
- * check if to persist blocks on namenode
- *
- * @return if to persist blocks on namenode
- */
- AtomicBoolean getPersistBlocks(){
- return persistBlocks;
- }
-
- /**
- * check if to append a chunk
- *
- * @param appendChunk if to append a chunk
- */
- void setAppendChunk(boolean appendChunk){
- this.appendChunk = appendChunk;
- }
-
- /**
- * get if to append a chunk
- *
- * @return if to append a chunk
- */
- boolean getAppendChunk(){
- return appendChunk;
- }
-
- /**
- * @return the last exception
- */
- LastExceptionInStreamer getLastException(){
- return lastException;
- }
-
- /**
- * set socket to null
- */
- void setSocketToNull() {
- this.s = null;
- }
-
- /**
- * return current sequence number and then increase it by 1
- *
- * @return current sequence number before increasing
- */
- long getAndIncCurrentSeqno() {
- long old = this.currentSeqno;
- this.currentSeqno++;
- return old;
- }
-
- /**
- * get last queued sequence number
- *
- * @return last queued sequence number
- */
- long getLastQueuedSeqno() {
- return lastQueuedSeqno;
- }
-
- /**
- * get the number of bytes of current block
- *
- * @return the number of bytes of current block
- */
- long getBytesCurBlock() {
- return bytesCurBlock;
- }
-
- /**
- * set the bytes of current block that have been written
- *
- * @param bytesCurBlock bytes of current block that have been written
- */
- void setBytesCurBlock(long bytesCurBlock) {
- this.bytesCurBlock = bytesCurBlock;
- }
-
- /**
- * increase bytes of current block by len.
- *
- * @param len how many bytes to increase to current block
- */
- void incBytesCurBlock(long len) {
- this.bytesCurBlock += len;
- }
-
- /**
- * set artificial slow down for unit test
- *
- * @param period artificial slow down
- */
- void setArtificialSlowdown(long period) {
- this.artificialSlowdown = period;
- }
-
- /**
- * if this streamer is to terminate
- *
- * @return if this streamer is to terminate
- */
- boolean streamerClosed(){
- return streamerClosed;
- }
-
- void closeSocket() throws IOException {
- if (s != null) {
- s.close();
- }
- }
-
- @Override
- public String toString() {
- return (block == null? null: block.getLocalBlock())
- + "@" + Arrays.toString(getNodes());
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
index ef9f27a..b6bf6cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
@@ -31,9 +31,7 @@ public class HdfsConfiguration extends Configuration {
addDeprecatedKeys();
// adds the default resources
- Configuration.addDefaultResource("hdfs-default.xml");
- Configuration.addDefaultResource("hdfs-site.xml");
-
+ HdfsConfigurationLoader.init();
}
public HdfsConfiguration() {
@@ -52,9 +50,10 @@ public class HdfsConfiguration extends Configuration {
* This method is here so that when invoked, HdfsConfiguration is class-loaded if
* it hasn't already been previously loaded. Upon loading the class, the static
* initializer block above will be executed to add the deprecated keys and to add
- * the default resources. It is safe for this method to be called multiple times
- * as the static initializer block will only get invoked once.
- *
+ * the default resources via {@link HdfsConfigurationLoader#init()}. It is
+ * safe for this method to be called multiple times as the static initializer
+ * block will only get invoked once.
+ *
* This replaces the previously, dangerous practice of other classes calling
* Configuration.addDefaultResource("hdfs-default.xml") directly without loading
* HdfsConfiguration class first, thereby skipping the key deprecation
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
deleted file mode 100644
index f03e179..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.security.token.Token;
-
-@InterfaceAudience.Private
-public interface RemotePeerFactory {
- /**
- * @param addr The address to connect to.
- * @param blockToken Token used during optional SASL negotiation
- * @param datanodeId ID of destination DataNode
- * @return A new Peer connected to the address.
- *
- * @throws IOException If there was an error connecting or creating
- * the remote socket, encrypted stream, etc.
- */
- Peer newConnectedPeer(InetSocketAddress addr,
- Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
- throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
deleted file mode 100644
index ec17bb8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Thrown when an unknown cipher suite is encountered.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-public class UnknownCipherSuiteException extends IOException {
- public UnknownCipherSuiteException(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
deleted file mode 100644
index 0aac8c8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class UnknownCryptoProtocolVersionException extends IOException {
- private static final long serialVersionUID = 8957192l;
-
- public UnknownCryptoProtocolVersionException() {
- super();
- }
-
- public UnknownCryptoProtocolVersionException(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
deleted file mode 100644
index 2655c40..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.XAttr;
-import org.apache.hadoop.fs.XAttr.NameSpace;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-@InterfaceAudience.Private
-public class XAttrHelper {
-
- /**
- * Build <code>XAttr</code> from xattr name with prefix.
- */
- public static XAttr buildXAttr(String name) {
- return buildXAttr(name, null);
- }
-
- /**
- * Build <code>XAttr</code> from name with prefix and value.
- * Name can not be null. Value can be null. The name and prefix
- * are validated.
- * Both name and namespace are case sensitive.
- */
- public static XAttr buildXAttr(String name, byte[] value) {
- Preconditions.checkNotNull(name, "XAttr name cannot be null.");
-
- final int prefixIndex = name.indexOf(".");
- if (prefixIndex < 3) {// Prefix length is at least 3.
- throw new HadoopIllegalArgumentException("An XAttr name must be " +
- "prefixed with user/trusted/security/system/raw, followed by a '.'");
- } else if (prefixIndex == name.length() - 1) {
- throw new HadoopIllegalArgumentException("XAttr name cannot be empty.");
- }
-
- NameSpace ns;
- final String prefix = name.substring(0, prefixIndex);
- if (StringUtils.equalsIgnoreCase(prefix, NameSpace.USER.toString())) {
- ns = NameSpace.USER;
- } else if (
- StringUtils.equalsIgnoreCase(prefix, NameSpace.TRUSTED.toString())) {
- ns = NameSpace.TRUSTED;
- } else if (
- StringUtils.equalsIgnoreCase(prefix, NameSpace.SYSTEM.toString())) {
- ns = NameSpace.SYSTEM;
- } else if (
- StringUtils.equalsIgnoreCase(prefix, NameSpace.SECURITY.toString())) {
- ns = NameSpace.SECURITY;
- } else if (
- StringUtils.equalsIgnoreCase(prefix, NameSpace.RAW.toString())) {
- ns = NameSpace.RAW;
- } else {
- throw new HadoopIllegalArgumentException("An XAttr name must be " +
- "prefixed with user/trusted/security/system/raw, followed by a '.'");
- }
- XAttr xAttr = (new XAttr.Builder()).setNameSpace(ns).setName(name.
- substring(prefixIndex + 1)).setValue(value).build();
-
- return xAttr;
- }
-
- /**
- * Build xattr name with prefix as <code>XAttr</code> list.
- */
- public static List<XAttr> buildXAttrAsList(String name) {
- XAttr xAttr = buildXAttr(name);
- List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
- xAttrs.add(xAttr);
-
- return xAttrs;
- }
-
- /**
- * Get value of first xattr from <code>XAttr</code> list
- */
- public static byte[] getFirstXAttrValue(List<XAttr> xAttrs) {
- byte[] value = null;
- XAttr xAttr = getFirstXAttr(xAttrs);
- if (xAttr != null) {
- value = xAttr.getValue();
- if (value == null) {
- value = new byte[0]; // xattr exists, but no value.
- }
- }
- return value;
- }
-
- /**
- * Get first xattr from <code>XAttr</code> list
- */
- public static XAttr getFirstXAttr(List<XAttr> xAttrs) {
- if (xAttrs != null && !xAttrs.isEmpty()) {
- return xAttrs.get(0);
- }
-
- return null;
- }
-
- /**
- * Build xattr map from <code>XAttr</code> list, the key is
- * xattr name with prefix, and value is xattr value.
- */
- public static Map<String, byte[]> buildXAttrMap(List<XAttr> xAttrs) {
- if (xAttrs == null) {
- return null;
- }
- Map<String, byte[]> xAttrMap = Maps.newHashMap();
- for (XAttr xAttr : xAttrs) {
- String name = getPrefixedName(xAttr);
- byte[] value = xAttr.getValue();
- if (value == null) {
- value = new byte[0];
- }
- xAttrMap.put(name, value);
- }
-
- return xAttrMap;
- }
-
- /**
- * Get name with prefix from <code>XAttr</code>
- */
- public static String getPrefixedName(XAttr xAttr) {
- if (xAttr == null) {
- return null;
- }
-
- return getPrefixedName(xAttr.getNameSpace(), xAttr.getName());
- }
-
- public static String getPrefixedName(XAttr.NameSpace ns, String name) {
- return StringUtils.toLowerCase(ns.toString()) + "." + name;
- }
-
- /**
- * Build <code>XAttr</code> list from xattr name list.
- */
- public static List<XAttr> buildXAttrs(List<String> names) {
- if (names == null || names.isEmpty()) {
- throw new HadoopIllegalArgumentException("XAttr names can not be " +
- "null or empty.");
- }
-
- List<XAttr> xAttrs = Lists.newArrayListWithCapacity(names.size());
- for (String name : names) {
- xAttrs.add(buildXAttr(name, null));
- }
- return xAttrs;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
deleted file mode 100644
index e8ac686..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.client;
-
-import java.io.InputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.crypto.CryptoInputStream;
-import org.apache.hadoop.hdfs.DFSInputStream;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-
-import com.google.common.base.Preconditions;
-
-/**
- * The Hdfs implementation of {@link FSDataInputStream}.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class HdfsDataInputStream extends FSDataInputStream {
- public HdfsDataInputStream(DFSInputStream in) throws IOException {
- super(in);
- }
-
- public HdfsDataInputStream(CryptoInputStream in) throws IOException {
- super(in);
- Preconditions.checkArgument(in.getWrappedStream() instanceof DFSInputStream,
- "CryptoInputStream should wrap a DFSInputStream");
- }
-
- private DFSInputStream getDFSInputStream() {
- if (in instanceof CryptoInputStream) {
- return (DFSInputStream) ((CryptoInputStream) in).getWrappedStream();
- }
- return (DFSInputStream) in;
- }
-
- /**
- * Get a reference to the wrapped output stream. We always want to return the
- * actual underlying InputStream, even when we're using a CryptoStream. e.g.
- * in the delegated methods below.
- *
- * @return the underlying output stream
- */
- public InputStream getWrappedStream() {
- return in;
- }
-
- /**
- * Get the datanode from which the stream is currently reading.
- */
- public DatanodeInfo getCurrentDatanode() {
- return getDFSInputStream().getCurrentDatanode();
- }
-
- /**
- * Get the block containing the target position.
- */
- public ExtendedBlock getCurrentBlock() {
- return getDFSInputStream().getCurrentBlock();
- }
-
- /**
- * Get the collection of blocks that has already been located.
- */
- public List<LocatedBlock> getAllBlocks() throws IOException {
- return getDFSInputStream().getAllBlocks();
- }
-
- /**
- * Get the visible length of the file. It will include the length of the last
- * block even if that is in UnderConstruction state.
- *
- * @return The visible length of the file.
- */
- public long getVisibleLength() throws IOException {
- return getDFSInputStream().getFileLength();
- }
-
- /**
- * Get statistics about the reads which this DFSInputStream has done.
- * Note that because HdfsDataInputStream is buffered, these stats may
- * be higher than you would expect just by adding up the number of
- * bytes read through HdfsDataInputStream.
- */
- public DFSInputStream.ReadStatistics getReadStatistics() {
- return getDFSInputStream().getReadStatistics();
- }
-
- public void clearReadStatistics() {
- getDFSInputStream().clearReadStatistics();
- }
-}