You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:03 UTC
[03/58] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream
and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
deleted file mode 100755
index de1d1ee..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ /dev/null
@@ -1,918 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.channels.ClosedChannelException;
-import java.util.EnumSet;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.crypto.CryptoProtocolVersion;
-import org.apache.hadoop.fs.CanSetDropBehind;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
-import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
-import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DataChecksum.Type;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Time;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/****************************************************************
- * DFSOutputStream creates files from a stream of bytes.
- *
- * The client application writes data that is cached internally by
- * this stream. Data is broken up into packets, each packet is
- * typically 64K in size. A packet comprises of chunks. Each chunk
- * is typically 512 bytes and has an associated checksum with it.
- *
- * When a client application fills up the currentPacket, it is
- * enqueued into the dataQueue of DataStreamer. DataStreamer is a
- * thread that picks up packets from the dataQueue and sends it to
- * the first datanode in the pipeline.
- *
- ****************************************************************/
-@InterfaceAudience.Private
-public class DFSOutputStream extends FSOutputSummer
- implements Syncable, CanSetDropBehind {
- static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
- /**
- * Number of times to retry creating a file when there are transient
- * errors (typically related to encryption zones and KeyProvider operations).
- */
- @VisibleForTesting
- static final int CREATE_RETRY_COUNT = 10;
- @VisibleForTesting
- static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
- CryptoProtocolVersion.supported();
-
- protected final DFSClient dfsClient;
- protected final ByteArrayManager byteArrayManager;
- // closed is accessed by different threads under different locks.
- protected volatile boolean closed = false;
-
- protected final String src;
- protected final long fileId;
- protected final long blockSize;
- protected final int bytesPerChecksum;
-
- protected DFSPacket currentPacket = null;
- private DataStreamer streamer;
- protected int packetSize = 0; // write packet size, not including the header.
- protected int chunksPerPacket = 0;
- protected long lastFlushOffset = 0; // offset when flush was invoked
- private long initialFileSize = 0; // at time of file open
- private final short blockReplication; // replication factor of file
- protected boolean shouldSyncBlock = false; // force blocks to disk upon close
- protected final AtomicReference<CachingStrategy> cachingStrategy;
- private FileEncryptionInfo fileEncryptionInfo;
-
- /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
- protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
- long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
- final byte[] buf;
- final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
-
- try {
- buf = byteArrayManager.newByteArray(bufferSize);
- } catch (InterruptedException ie) {
- final InterruptedIOException iioe = new InterruptedIOException(
- "seqno=" + seqno);
- iioe.initCause(ie);
- throw iioe;
- }
-
- return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
- getChecksumSize(), lastPacketInBlock);
- }
-
- @Override
- protected void checkClosed() throws IOException {
- if (isClosed()) {
- getStreamer().getLastException().throwException4Close();
- }
- }
-
- //
- // returns the list of targets, if any, that is being currently used.
- //
- @VisibleForTesting
- public synchronized DatanodeInfo[] getPipeline() {
- if (getStreamer().streamerClosed()) {
- return null;
- }
- DatanodeInfo[] currentNodes = getStreamer().getNodes();
- if (currentNodes == null) {
- return null;
- }
- DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
- for (int i = 0; i < currentNodes.length; i++) {
- value[i] = currentNodes[i];
- }
- return value;
- }
-
- /**
- * @return the object for computing checksum.
- * The type is NULL if checksum is not computed.
- */
- private static DataChecksum getChecksum4Compute(DataChecksum checksum,
- HdfsFileStatus stat) {
- if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) {
- // do not compute checksum for writing to single replica to memory
- return DataChecksum.newDataChecksum(Type.NULL,
- checksum.getBytesPerChecksum());
- }
- return checksum;
- }
-
- private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
- HdfsFileStatus stat, DataChecksum checksum) throws IOException {
- super(getChecksum4Compute(checksum, stat));
- this.dfsClient = dfsClient;
- this.src = src;
- this.fileId = stat.getFileId();
- this.blockSize = stat.getBlockSize();
- this.blockReplication = stat.getReplication();
- this.fileEncryptionInfo = stat.getFileEncryptionInfo();
- this.cachingStrategy = new AtomicReference<CachingStrategy>(
- dfsClient.getDefaultWriteCachingStrategy());
- if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug(
- "Set non-null progress callback on DFSOutputStream " + src);
- }
-
- this.bytesPerChecksum = checksum.getBytesPerChecksum();
- if (bytesPerChecksum <= 0) {
- throw new HadoopIllegalArgumentException(
- "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
- }
- if (blockSize % bytesPerChecksum != 0) {
- throw new HadoopIllegalArgumentException("Invalid values: "
- + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
- + ") must divide block size (=" + blockSize + ").");
- }
- this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
- }
-
- /** Construct a new output stream for creating a file. */
- protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
- EnumSet<CreateFlag> flag, Progressable progress,
- DataChecksum checksum, String[] favoredNodes) throws IOException {
- this(dfsClient, src, progress, stat, checksum);
- this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
-
- computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
-
- streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
- cachingStrategy, byteArrayManager, favoredNodes);
- }
-
- static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
- FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
- short replication, long blockSize, Progressable progress, int buffersize,
- DataChecksum checksum, String[] favoredNodes) throws IOException {
- TraceScope scope =
- dfsClient.getPathTraceScope("newStreamForCreate", src);
- try {
- HdfsFileStatus stat = null;
-
- // Retry the create if we get a RetryStartFileException up to a maximum
- // number of times
- boolean shouldRetry = true;
- int retryCount = CREATE_RETRY_COUNT;
- while (shouldRetry) {
- shouldRetry = false;
- try {
- stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
- new EnumSetWritable<CreateFlag>(flag), createParent, replication,
- blockSize, SUPPORTED_CRYPTO_VERSIONS);
- break;
- } catch (RemoteException re) {
- IOException e = re.unwrapRemoteException(
- AccessControlException.class,
- DSQuotaExceededException.class,
- QuotaByStorageTypeExceededException.class,
- FileAlreadyExistsException.class,
- FileNotFoundException.class,
- ParentNotDirectoryException.class,
- NSQuotaExceededException.class,
- RetryStartFileException.class,
- SafeModeException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class,
- UnknownCryptoProtocolVersionException.class);
- if (e instanceof RetryStartFileException) {
- if (retryCount > 0) {
- shouldRetry = true;
- retryCount--;
- } else {
- throw new IOException("Too many retries because of encryption" +
- " zone operations", e);
- }
- } else {
- throw e;
- }
- }
- }
- Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
- final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes);
- out.start();
- return out;
- } finally {
- scope.close();
- }
- }
-
- /** Construct a new output stream for append. */
- private DFSOutputStream(DFSClient dfsClient, String src,
- EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
- HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
- throws IOException {
- this(dfsClient, src, progress, stat, checksum);
- initialFileSize = stat.getLen(); // length of file when opened
- this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
-
- boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
-
- this.fileEncryptionInfo = stat.getFileEncryptionInfo();
-
- // The last partial block of the file has to be filled.
- if (!toNewBlock && lastBlock != null) {
- // indicate that we are appending to an existing block
- streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
- cachingStrategy, byteArrayManager);
- getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
- adjustPacketChunkSize(stat);
- getStreamer().setPipelineInConstruction(lastBlock);
- } else {
- computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
- bytesPerChecksum);
- streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
- dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
- favoredNodes);
- }
- }
-
- private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{
-
- long usedInLastBlock = stat.getLen() % blockSize;
- int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
- // calculate the amount of free space in the pre-existing
- // last crc chunk
- int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
- int freeInCksum = bytesPerChecksum - usedInCksum;
-
- // if there is space in the last block, then we have to
- // append to that block
- if (freeInLastBlock == blockSize) {
- throw new IOException("The last block for file " +
- src + " is full.");
- }
-
- if (usedInCksum > 0 && freeInCksum > 0) {
- // if there is space in the last partial chunk, then
- // setup in such a way that the next packet will have only
- // one chunk that fills up the partial chunk.
- //
- computePacketChunkSize(0, freeInCksum);
- setChecksumBufSize(freeInCksum);
- getStreamer().setAppendChunk(true);
- } else {
- // if the remaining space in the block is smaller than
- // that expected size of of a packet, then create
- // smaller size packet.
- //
- computePacketChunkSize(
- Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
- bytesPerChecksum);
- }
- }
-
- static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
- EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
- LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
- String[] favoredNodes) throws IOException {
- TraceScope scope =
- dfsClient.getPathTraceScope("newStreamForAppend", src);
- try {
- final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
- progress, lastBlock, stat, checksum, favoredNodes);
- out.start();
- return out;
- } finally {
- scope.close();
- }
- }
-
- protected void computePacketChunkSize(int psize, int csize) {
- final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
- final int chunkSize = csize + getChecksumSize();
- chunksPerPacket = Math.max(bodySize/chunkSize, 1);
- packetSize = chunkSize*chunksPerPacket;
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
- ", chunkSize=" + chunkSize +
- ", chunksPerPacket=" + chunksPerPacket +
- ", packetSize=" + packetSize);
- }
- }
-
- protected TraceScope createWriteTraceScope() {
- return dfsClient.getPathTraceScope("DFSOutputStream#write", src);
- }
-
- // @see FSOutputSummer#writeChunk()
- @Override
- protected synchronized void writeChunk(byte[] b, int offset, int len,
- byte[] checksum, int ckoff, int cklen) throws IOException {
- dfsClient.checkOpen();
- checkClosed();
-
- if (len > bytesPerChecksum) {
- throw new IOException("writeChunk() buffer size is " + len +
- " is larger than supported bytesPerChecksum " +
- bytesPerChecksum);
- }
- if (cklen != 0 && cklen != getChecksumSize()) {
- throw new IOException("writeChunk() checksum size is supposed to be " +
- getChecksumSize() + " but found to be " + cklen);
- }
-
- if (currentPacket == null) {
- currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
- .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
- currentPacket.getSeqno() +
- ", src=" + src +
- ", packetSize=" + packetSize +
- ", chunksPerPacket=" + chunksPerPacket +
- ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
- }
- }
-
- currentPacket.writeChecksum(checksum, ckoff, cklen);
- currentPacket.writeData(b, offset, len);
- currentPacket.incNumChunks();
- getStreamer().incBytesCurBlock(len);
-
- // If packet is full, enqueue it for transmission
- //
- if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
- getStreamer().getBytesCurBlock() == blockSize) {
- enqueueCurrentPacketFull();
- }
- }
-
- void enqueueCurrentPacket() throws IOException {
- getStreamer().waitAndQueuePacket(currentPacket);
- currentPacket = null;
- }
-
- void enqueueCurrentPacketFull() throws IOException {
- LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
- + " appendChunk={}, {}", currentPacket, src, getStreamer()
- .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
- getStreamer());
- enqueueCurrentPacket();
- adjustChunkBoundary();
- endBlock();
- }
-
- /** create an empty packet to mark the end of the block. */
- void setCurrentPacketToEmpty() throws InterruptedIOException {
- currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
- getStreamer().getAndIncCurrentSeqno(), true);
- currentPacket.setSyncBlock(shouldSyncBlock);
- }
-
- /**
- * If the reopened file did not end at chunk boundary and the above
- * write filled up its partial chunk. Tell the summer to generate full
- * crc chunks from now on.
- */
- protected void adjustChunkBoundary() {
- if (getStreamer().getAppendChunk() &&
- getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
- getStreamer().setAppendChunk(false);
- resetChecksumBufSize();
- }
-
- if (!getStreamer().getAppendChunk()) {
- int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
- dfsClient.getConf().getWritePacketSize());
- computePacketChunkSize(psize, bytesPerChecksum);
- }
- }
-
- /**
- * if encountering a block boundary, send an empty packet to
- * indicate the end of block and reset bytesCurBlock.
- *
- * @throws IOException
- */
- protected void endBlock() throws IOException {
- if (getStreamer().getBytesCurBlock() == blockSize) {
- setCurrentPacketToEmpty();
- enqueueCurrentPacket();
- getStreamer().setBytesCurBlock(0);
- lastFlushOffset = 0;
- }
- }
-
- /**
- * Flushes out to all replicas of the block. The data is in the buffers
- * of the DNs but not necessarily in the DN's OS buffers.
- *
- * It is a synchronous operation. When it returns,
- * it guarantees that flushed data become visible to new readers.
- * It is not guaranteed that data has been flushed to
- * persistent store on the datanode.
- * Block allocations are persisted on namenode.
- */
- @Override
- public void hflush() throws IOException {
- TraceScope scope =
- dfsClient.getPathTraceScope("hflush", src);
- try {
- flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
- } finally {
- scope.close();
- }
- }
-
- @Override
- public void hsync() throws IOException {
- TraceScope scope =
- dfsClient.getPathTraceScope("hsync", src);
- try {
- flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
- } finally {
- scope.close();
- }
- }
-
- /**
- * The expected semantics is all data have flushed out to all replicas
- * and all replicas have done posix fsync equivalent - ie the OS has
- * flushed it to the disk device (but the disk may have it in its cache).
- *
- * Note that only the current block is flushed to the disk device.
- * To guarantee durable sync across block boundaries the stream should
- * be created with {@link CreateFlag#SYNC_BLOCK}.
- *
- * @param syncFlags
- * Indicate the semantic of the sync. Currently used to specify
- * whether or not to update the block length in NameNode.
- */
- public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
- TraceScope scope =
- dfsClient.getPathTraceScope("hsync", src);
- try {
- flushOrSync(true, syncFlags);
- } finally {
- scope.close();
- }
- }
-
- /**
- * Flush/Sync buffered data to DataNodes.
- *
- * @param isSync
- * Whether or not to require all replicas to flush data to the disk
- * device
- * @param syncFlags
- * Indicate extra detailed semantic of the flush/sync. Currently
- * mainly used to specify whether or not to update the file length in
- * the NameNode
- * @throws IOException
- */
- private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
- throws IOException {
- dfsClient.checkOpen();
- checkClosed();
- try {
- long toWaitFor;
- long lastBlockLength = -1L;
- boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
- boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
- synchronized (this) {
- // flush checksum buffer, but keep checksum buffer intact if we do not
- // need to end the current block
- int numKept = flushBuffer(!endBlock, true);
- // bytesCurBlock potentially incremented if there was buffered data
-
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("DFSClient flush(): "
- + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
- + " lastFlushOffset=" + lastFlushOffset
- + " createNewBlock=" + endBlock);
- }
- // Flush only if we haven't already flushed till this offset.
- if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
- assert getStreamer().getBytesCurBlock() > lastFlushOffset;
- // record the valid offset of this flush
- lastFlushOffset = getStreamer().getBytesCurBlock();
- if (isSync && currentPacket == null && !endBlock) {
- // Nothing to send right now,
- // but sync was requested.
- // Send an empty packet if we do not end the block right now
- currentPacket = createPacket(packetSize, chunksPerPacket,
- getStreamer().getBytesCurBlock(), getStreamer()
- .getAndIncCurrentSeqno(), false);
- }
- } else {
- if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
- // Nothing to send right now,
- // and the block was partially written,
- // and sync was requested.
- // So send an empty sync packet if we do not end the block right
- // now
- currentPacket = createPacket(packetSize, chunksPerPacket,
- getStreamer().getBytesCurBlock(), getStreamer()
- .getAndIncCurrentSeqno(), false);
- } else if (currentPacket != null) {
- // just discard the current packet since it is already been sent.
- currentPacket.releaseBuffer(byteArrayManager);
- currentPacket = null;
- }
- }
- if (currentPacket != null) {
- currentPacket.setSyncBlock(isSync);
- enqueueCurrentPacket();
- }
- if (endBlock && getStreamer().getBytesCurBlock() > 0) {
- // Need to end the current block, thus send an empty packet to
- // indicate this is the end of the block and reset bytesCurBlock
- currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
- getStreamer().getAndIncCurrentSeqno(), true);
- currentPacket.setSyncBlock(shouldSyncBlock || isSync);
- enqueueCurrentPacket();
- getStreamer().setBytesCurBlock(0);
- lastFlushOffset = 0;
- } else {
- // Restore state of stream. Record the last flush offset
- // of the last full chunk that was flushed.
- getStreamer().setBytesCurBlock(
- getStreamer().getBytesCurBlock() - numKept);
- }
-
- toWaitFor = getStreamer().getLastQueuedSeqno();
- } // end synchronized
-
- getStreamer().waitForAckedSeqno(toWaitFor);
-
- // update the block length first time irrespective of flag
- if (updateLength || getStreamer().getPersistBlocks().get()) {
- synchronized (this) {
- if (!getStreamer().streamerClosed()
- && getStreamer().getBlock() != null) {
- lastBlockLength = getStreamer().getBlock().getNumBytes();
- }
- }
- }
- // If 1) any new blocks were allocated since the last flush, or 2) to
- // update length in NN is required, then persist block locations on
- // namenode.
- if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
- try {
- dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
- lastBlockLength);
- } catch (IOException ioe) {
- DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
- // If we got an error here, it might be because some other thread called
- // close before our hflush completed. In that case, we should throw an
- // exception that the stream is closed.
- checkClosed();
- // If we aren't closed but failed to sync, we should expose that to the
- // caller.
- throw ioe;
- }
- }
-
- synchronized(this) {
- if (!getStreamer().streamerClosed()) {
- getStreamer().setHflush();
- }
- }
- } catch (InterruptedIOException interrupt) {
- // This kind of error doesn't mean that the stream itself is broken - just the
- // flushing thread got interrupted. So, we shouldn't close down the writer,
- // but instead just propagate the error
- throw interrupt;
- } catch (IOException e) {
- DFSClient.LOG.warn("Error while syncing", e);
- synchronized (this) {
- if (!isClosed()) {
- getStreamer().getLastException().set(e);
- closeThreads(true);
- }
- }
- throw e;
- }
- }
-
- /**
- * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
- */
- @Deprecated
- public synchronized int getNumCurrentReplicas() throws IOException {
- return getCurrentBlockReplication();
- }
-
- /**
- * Note that this is not a public API;
- * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead.
- *
- * @return the number of valid replicas of the current block
- */
- public synchronized int getCurrentBlockReplication() throws IOException {
- dfsClient.checkOpen();
- checkClosed();
- if (getStreamer().streamerClosed()) {
- return blockReplication; // no pipeline, return repl factor of file
- }
- DatanodeInfo[] currentNodes = getStreamer().getNodes();
- if (currentNodes == null) {
- return blockReplication; // no pipeline, return repl factor of file
- }
- return currentNodes.length;
- }
-
- /**
- * Waits till all existing data is flushed and confirmations
- * received from datanodes.
- */
- protected void flushInternal() throws IOException {
- long toWaitFor;
- synchronized (this) {
- dfsClient.checkOpen();
- checkClosed();
- //
- // If there is data in the current buffer, send it across
- //
- getStreamer().queuePacket(currentPacket);
- currentPacket = null;
- toWaitFor = getStreamer().getLastQueuedSeqno();
- }
-
- getStreamer().waitForAckedSeqno(toWaitFor);
- }
-
- protected synchronized void start() {
- getStreamer().start();
- }
-
- /**
- * Aborts this output stream and releases any system
- * resources associated with this stream.
- */
- synchronized void abort() throws IOException {
- if (isClosed()) {
- return;
- }
- getStreamer().getLastException().set(new IOException("Lease timeout of "
- + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
- closeThreads(true);
- dfsClient.endFileLease(fileId);
- }
-
- boolean isClosed() {
- return closed || getStreamer().streamerClosed();
- }
-
- void setClosed() {
- closed = true;
- getStreamer().release();
- }
-
- // shutdown datastreamer and responseprocessor threads.
- // interrupt datastreamer if force is true
- protected void closeThreads(boolean force) throws IOException {
- try {
- getStreamer().close(force);
- getStreamer().join();
- getStreamer().closeSocket();
- } catch (InterruptedException e) {
- throw new IOException("Failed to shutdown streamer");
- } finally {
- getStreamer().setSocketToNull();
- setClosed();
- }
- }
-
- /**
- * Closes this output stream and releases any system
- * resources associated with this stream.
- */
- @Override
- public synchronized void close() throws IOException {
- TraceScope scope =
- dfsClient.getPathTraceScope("DFSOutputStream#close", src);
- try {
- closeImpl();
- } finally {
- scope.close();
- }
- }
-
- protected synchronized void closeImpl() throws IOException {
- if (isClosed()) {
- getStreamer().getLastException().check(true);
- return;
- }
-
- try {
- flushBuffer(); // flush from all upper layers
-
- if (currentPacket != null) {
- enqueueCurrentPacket();
- }
-
- if (getStreamer().getBytesCurBlock() != 0) {
- setCurrentPacketToEmpty();
- }
-
- flushInternal(); // flush all data to Datanodes
- // get last block before destroying the streamer
- ExtendedBlock lastBlock = getStreamer().getBlock();
- closeThreads(false);
- TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
- try {
- completeFile(lastBlock);
- } finally {
- scope.close();
- }
- dfsClient.endFileLease(fileId);
- } catch (ClosedChannelException e) {
- } finally {
- setClosed();
- }
- }
-
- // should be called holding (this) lock since setTestFilename() may
- // be called during unit tests
- protected void completeFile(ExtendedBlock last) throws IOException {
- long localstart = Time.monotonicNow();
- final DfsClientConf conf = dfsClient.getConf();
- long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
- boolean fileComplete = false;
- int retries = conf.getNumBlockWriteLocateFollowingRetry();
- while (!fileComplete) {
- fileComplete =
- dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
- if (!fileComplete) {
- final int hdfsTimeout = conf.getHdfsTimeout();
- if (!dfsClient.clientRunning
- || (hdfsTimeout > 0
- && localstart + hdfsTimeout < Time.monotonicNow())) {
- String msg = "Unable to close file because dfsclient " +
- " was unable to contact the HDFS servers." +
- " clientRunning " + dfsClient.clientRunning +
- " hdfsTimeout " + hdfsTimeout;
- DFSClient.LOG.info(msg);
- throw new IOException(msg);
- }
- try {
- if (retries == 0) {
- throw new IOException("Unable to close file because the last block"
- + " does not have enough number of replicas.");
- }
- retries--;
- Thread.sleep(sleeptime);
- sleeptime *= 2;
- if (Time.monotonicNow() - localstart > 5000) {
- DFSClient.LOG.info("Could not complete " + src + " retrying...");
- }
- } catch (InterruptedException ie) {
- DFSClient.LOG.warn("Caught exception ", ie);
- }
- }
- }
- }
-
- @VisibleForTesting
- public void setArtificialSlowdown(long period) {
- getStreamer().setArtificialSlowdown(period);
- }
-
- @VisibleForTesting
- public synchronized void setChunksPerPacket(int value) {
- chunksPerPacket = Math.min(chunksPerPacket, value);
- packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
- }
-
- /**
- * Returns the size of a file as it was when this stream was opened
- */
- public long getInitialLen() {
- return initialFileSize;
- }
-
- /**
- * @return the FileEncryptionInfo for this stream, or null if not encrypted.
- */
- public FileEncryptionInfo getFileEncryptionInfo() {
- return fileEncryptionInfo;
- }
-
- /**
- * Returns the access token currently used by streamer, for testing only
- */
- synchronized Token<BlockTokenIdentifier> getBlockToken() {
- return getStreamer().getBlockToken();
- }
-
- @Override
- public void setDropBehind(Boolean dropBehind) throws IOException {
- CachingStrategy prevStrategy, nextStrategy;
- // CachingStrategy is immutable. So build a new CachingStrategy with the
- // modifications we want, and compare-and-swap it in.
- do {
- prevStrategy = this.cachingStrategy.get();
- nextStrategy = new CachingStrategy.Builder(prevStrategy).
- setDropBehind(dropBehind).build();
- } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
- }
-
- @VisibleForTesting
- ExtendedBlock getBlock() {
- return getStreamer().getBlock();
- }
-
- @VisibleForTesting
- public long getFileId() {
- return fileId;
- }
-
- /**
- * Return the source of stream.
- */
- String getSrc() {
- return src;
- }
-
- /**
- * Returns the data streamer object.
- */
- protected DataStreamer getStreamer() {
- return streamer;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
deleted file mode 100755
index 22055c3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.BufferOverflowException;
-import java.nio.channels.ClosedChannelException;
-import java.util.Arrays;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.util.ByteArrayManager;
-import org.apache.htrace.Span;
-
-/****************************************************************
- * DFSPacket is used by DataStreamer and DFSOutputStream.
- * DFSOutputStream generates packets and then ask DatStreamer
- * to send them to datanodes.
- ****************************************************************/
-
-@InterfaceAudience.Private
-class DFSPacket {
- public static final long HEART_BEAT_SEQNO = -1L;
- private static long[] EMPTY = new long[0];
- private final long seqno; // sequence number of buffer in block
- private final long offsetInBlock; // offset in block
- private boolean syncBlock; // this packet forces the current block to disk
- private int numChunks; // number of chunks currently in packet
- private final int maxChunks; // max chunks in packet
- private byte[] buf;
- private final boolean lastPacketInBlock; // is this the last packet in block?
-
- /**
- * buf is pointed into like follows:
- * (C is checksum data, D is payload data)
- *
- * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
- * ^ ^ ^ ^
- * | checksumPos dataStart dataPos
- * checksumStart
- *
- * Right before sending, we move the checksum data to immediately precede
- * the actual data, and then insert the header into the buffer immediately
- * preceding the checksum data, so we make sure to keep enough space in
- * front of the checksum data to support the largest conceivable header.
- */
- private int checksumStart;
- private int checksumPos;
- private final int dataStart;
- private int dataPos;
- private long[] traceParents = EMPTY;
- private int traceParentsUsed;
- private Span span;
-
- /**
- * Create a new packet.
- *
- * @param buf the buffer storing data and checksums
- * @param chunksPerPkt maximum number of chunks per packet.
- * @param offsetInBlock offset in bytes into the HDFS block.
- * @param seqno the sequence number of this packet
- * @param checksumSize the size of checksum
- * @param lastPacketInBlock if this is the last packet
- */
- DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
- int checksumSize, boolean lastPacketInBlock) {
- this.lastPacketInBlock = lastPacketInBlock;
- this.numChunks = 0;
- this.offsetInBlock = offsetInBlock;
- this.seqno = seqno;
-
- this.buf = buf;
-
- checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
- checksumPos = checksumStart;
- dataStart = checksumStart + (chunksPerPkt * checksumSize);
- dataPos = dataStart;
- maxChunks = chunksPerPkt;
- }
-
- /**
- * Write data to this packet.
- *
- * @param inarray input array of data
- * @param off the offset of data to write
- * @param len the length of data to write
- * @throws ClosedChannelException
- */
- synchronized void writeData(byte[] inarray, int off, int len)
- throws ClosedChannelException {
- checkBuffer();
- if (dataPos + len > buf.length) {
- throw new BufferOverflowException();
- }
- System.arraycopy(inarray, off, buf, dataPos, len);
- dataPos += len;
- }
-
- /**
- * Write checksums to this packet
- *
- * @param inarray input array of checksums
- * @param off the offset of checksums to write
- * @param len the length of checksums to write
- * @throws ClosedChannelException
- */
- synchronized void writeChecksum(byte[] inarray, int off, int len)
- throws ClosedChannelException {
- checkBuffer();
- if (len == 0) {
- return;
- }
- if (checksumPos + len > dataStart) {
- throw new BufferOverflowException();
- }
- System.arraycopy(inarray, off, buf, checksumPos, len);
- checksumPos += len;
- }
-
- /**
- * Write the full packet, including the header, to the given output stream.
- *
- * @param stm
- * @throws IOException
- */
- synchronized void writeTo(DataOutputStream stm) throws IOException {
- checkBuffer();
-
- final int dataLen = dataPos - dataStart;
- final int checksumLen = checksumPos - checksumStart;
- final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
-
- PacketHeader header = new PacketHeader(
- pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
-
- if (checksumPos != dataStart) {
- // Move the checksum to cover the gap. This can happen for the last
- // packet or during an hflush/hsync call.
- System.arraycopy(buf, checksumStart, buf,
- dataStart - checksumLen , checksumLen);
- checksumPos = dataStart;
- checksumStart = checksumPos - checksumLen;
- }
-
- final int headerStart = checksumStart - header.getSerializedSize();
- assert checksumStart + 1 >= header.getSerializedSize();
- assert headerStart >= 0;
- assert headerStart + header.getSerializedSize() == checksumStart;
-
- // Copy the header data into the buffer immediately preceding the checksum
- // data.
- System.arraycopy(header.getBytes(), 0, buf, headerStart,
- header.getSerializedSize());
-
- // corrupt the data for testing.
- if (DFSClientFaultInjector.get().corruptPacket()) {
- buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
- }
-
- // Write the now contiguous full packet to the output stream.
- stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
-
- // undo corruption.
- if (DFSClientFaultInjector.get().uncorruptPacket()) {
- buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
- }
- }
-
- private synchronized void checkBuffer() throws ClosedChannelException {
- if (buf == null) {
- throw new ClosedChannelException();
- }
- }
-
- /**
- * Release the buffer in this packet to ByteArrayManager.
- *
- * @param bam
- */
- synchronized void releaseBuffer(ByteArrayManager bam) {
- bam.release(buf);
- buf = null;
- }
-
- /**
- * get the packet's last byte's offset in the block
- *
- * @return the packet's last byte's offset in the block
- */
- synchronized long getLastByteOffsetBlock() {
- return offsetInBlock + dataPos - dataStart;
- }
-
- /**
- * Check if this packet is a heart beat packet
- *
- * @return true if the sequence number is HEART_BEAT_SEQNO
- */
- boolean isHeartbeatPacket() {
- return seqno == HEART_BEAT_SEQNO;
- }
-
- /**
- * check if this packet is the last packet in block
- *
- * @return true if the packet is the last packet
- */
- boolean isLastPacketInBlock(){
- return lastPacketInBlock;
- }
-
- /**
- * get sequence number of this packet
- *
- * @return the sequence number of this packet
- */
- long getSeqno(){
- return seqno;
- }
-
- /**
- * get the number of chunks this packet contains
- *
- * @return the number of chunks in this packet
- */
- synchronized int getNumChunks(){
- return numChunks;
- }
-
- /**
- * increase the number of chunks by one
- */
- synchronized void incNumChunks(){
- numChunks++;
- }
-
- /**
- * get the maximum number of packets
- *
- * @return the maximum number of packets
- */
- int getMaxChunks(){
- return maxChunks;
- }
-
- /**
- * set if to sync block
- *
- * @param syncBlock if to sync block
- */
- synchronized void setSyncBlock(boolean syncBlock){
- this.syncBlock = syncBlock;
- }
-
- @Override
- public String toString() {
- return "packet seqno: " + this.seqno +
- " offsetInBlock: " + this.offsetInBlock +
- " lastPacketInBlock: " + this.lastPacketInBlock +
- " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
- }
-
- /**
- * Add a trace parent span for this packet.<p/>
- *
- * Trace parent spans for a packet are the trace spans responsible for
- * adding data to that packet. We store them as an array of longs for
- * efficiency.<p/>
- *
- * Protected by the DFSOutputStream dataQueue lock.
- */
- public void addTraceParent(Span span) {
- if (span == null) {
- return;
- }
- addTraceParent(span.getSpanId());
- }
-
- public void addTraceParent(long id) {
- if (traceParentsUsed == traceParents.length) {
- int newLength = (traceParents.length == 0) ? 8 :
- traceParents.length * 2;
- traceParents = Arrays.copyOf(traceParents, newLength);
- }
- traceParents[traceParentsUsed] = id;
- traceParentsUsed++;
- }
-
- /**
- * Get the trace parent spans for this packet.<p/>
- *
- * Will always be non-null.<p/>
- *
- * Protected by the DFSOutputStream dataQueue lock.
- */
- public long[] getTraceParents() {
- // Remove duplicates from the array.
- int len = traceParentsUsed;
- Arrays.sort(traceParents, 0, len);
- int i = 0, j = 0;
- long prevVal = 0; // 0 is not a valid span id
- while (true) {
- if (i == len) {
- break;
- }
- long val = traceParents[i];
- if (val != prevVal) {
- traceParents[j] = val;
- j++;
- prevVal = val;
- }
- i++;
- }
- if (j < traceParents.length) {
- traceParents = Arrays.copyOf(traceParents, j);
- traceParentsUsed = traceParents.length;
- }
- return traceParents;
- }
-
- public void setTraceSpan(Span span) {
- this.span = span;
- }
-
- public Span getTraceSpan() {
- return span;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index fe9e342..5b11ac2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -1441,27 +1440,4 @@ public class DFSUtil {
return cryptoProvider;
}
- public static int getIoFileBufferSize(Configuration conf) {
- return conf.getInt(
- CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
- CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
- }
-
- public static int getSmallBufferSize(Configuration conf) {
- return Math.min(getIoFileBufferSize(conf) / 2, 512);
- }
-
- /**
- * Probe for HDFS Encryption being enabled; this uses the value of
- * the option {@link DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI},
- * returning true if that property contains a non-empty, non-whitespace
- * string.
- * @param conf configuration to probe
- * @return true if encryption is considered enabled.
- */
- public static boolean isHDFSEncryptionEnabled(Configuration conf) {
- return !conf.getTrimmed(
- DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
- }
-
}