You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:38 UTC
[38/58] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 0000000,7a40d73..78eaa6c
mode 000000,100755..100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@@ -1,0 -1,917 +1,982 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import java.io.FileNotFoundException;
+ import java.io.IOException;
+ import java.io.InterruptedIOException;
+ import java.nio.channels.ClosedChannelException;
+ import java.util.EnumSet;
+ import java.util.concurrent.atomic.AtomicReference;
+
+ import org.apache.hadoop.HadoopIllegalArgumentException;
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.crypto.CryptoProtocolVersion;
+ import org.apache.hadoop.fs.CanSetDropBehind;
+ import org.apache.hadoop.fs.CreateFlag;
+ import org.apache.hadoop.fs.FSOutputSummer;
+ import org.apache.hadoop.fs.FileAlreadyExistsException;
+ import org.apache.hadoop.fs.FileEncryptionInfo;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.ParentNotDirectoryException;
+ import org.apache.hadoop.fs.Syncable;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+ import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+ import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
++import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+ import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
+ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+ import org.apache.hadoop.hdfs.util.ByteArrayManager;
+ import org.apache.hadoop.io.EnumSetWritable;
+ import org.apache.hadoop.ipc.RemoteException;
+ import org.apache.hadoop.security.AccessControlException;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.hadoop.util.DataChecksum.Type;
+ import org.apache.hadoop.util.Progressable;
+ import org.apache.hadoop.util.Time;
+ import org.apache.htrace.core.TraceScope;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.Preconditions;
+
+
+ /****************************************************************
+ * DFSOutputStream creates files from a stream of bytes.
+ *
+ * The client application writes data that is cached internally by
+ * this stream. Data is broken up into packets, each packet is
+ * typically 64K in size. A packet comprises of chunks. Each chunk
+ * is typically 512 bytes and has an associated checksum with it.
+ *
+ * When a client application fills up the currentPacket, it is
+ * enqueued into the dataQueue of DataStreamer. DataStreamer is a
+ * thread that picks up packets from the dataQueue and sends it to
+ * the first datanode in the pipeline.
+ *
+ ****************************************************************/
+ @InterfaceAudience.Private
+ public class DFSOutputStream extends FSOutputSummer
+ implements Syncable, CanSetDropBehind {
+ static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
+ /**
+ * Number of times to retry creating a file when there are transient
+ * errors (typically related to encryption zones and KeyProvider operations).
+ */
+ @VisibleForTesting
+ static final int CREATE_RETRY_COUNT = 10;
+ @VisibleForTesting
+ static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
+ CryptoProtocolVersion.supported();
+
+ protected final DFSClient dfsClient;
+ protected final ByteArrayManager byteArrayManager;
+ // closed is accessed by different threads under different locks.
+ protected volatile boolean closed = false;
+
+ protected final String src;
+ protected final long fileId;
+ protected final long blockSize;
+ protected final int bytesPerChecksum;
+
+ protected DFSPacket currentPacket = null;
- private DataStreamer streamer;
++ protected DataStreamer streamer;
+ protected int packetSize = 0; // write packet size, not including the header.
+ protected int chunksPerPacket = 0;
+ protected long lastFlushOffset = 0; // offset when flush was invoked
+ private long initialFileSize = 0; // at time of file open
+ private final short blockReplication; // replication factor of file
+ protected boolean shouldSyncBlock = false; // force blocks to disk upon close
+ protected final AtomicReference<CachingStrategy> cachingStrategy;
+ private FileEncryptionInfo fileEncryptionInfo;
+
+ /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
+ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+ long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
+ final byte[] buf;
+ final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
+
+ try {
+ buf = byteArrayManager.newByteArray(bufferSize);
+ } catch (InterruptedException ie) {
+ final InterruptedIOException iioe = new InterruptedIOException(
+ "seqno=" + seqno);
+ iioe.initCause(ie);
+ throw iioe;
+ }
+
+ return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
- getChecksumSize(), lastPacketInBlock);
++ getChecksumSize(), lastPacketInBlock);
+ }
+
+ @Override
+ protected void checkClosed() throws IOException {
+ if (isClosed()) {
+ getStreamer().getLastException().throwException4Close();
+ }
+ }
+
+ //
+ // returns the list of targets, if any, that is being currently used.
+ //
+ @VisibleForTesting
+ public synchronized DatanodeInfo[] getPipeline() {
+ if (getStreamer().streamerClosed()) {
+ return null;
+ }
+ DatanodeInfo[] currentNodes = getStreamer().getNodes();
+ if (currentNodes == null) {
+ return null;
+ }
+ DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
+ for (int i = 0; i < currentNodes.length; i++) {
+ value[i] = currentNodes[i];
+ }
+ return value;
+ }
+
- /**
++ /**
+ * @return the object for computing checksum.
+ * The type is NULL if checksum is not computed.
+ */
+ private static DataChecksum getChecksum4Compute(DataChecksum checksum,
+ HdfsFileStatus stat) {
+ if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) {
+ // do not compute checksum for writing to single replica to memory
+ return DataChecksum.newDataChecksum(Type.NULL,
+ checksum.getBytesPerChecksum());
+ }
+ return checksum;
+ }
-
++
+ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
+ HdfsFileStatus stat, DataChecksum checksum) throws IOException {
+ super(getChecksum4Compute(checksum, stat));
+ this.dfsClient = dfsClient;
+ this.src = src;
+ this.fileId = stat.getFileId();
+ this.blockSize = stat.getBlockSize();
+ this.blockReplication = stat.getReplication();
+ this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+ this.cachingStrategy = new AtomicReference<CachingStrategy>(
+ dfsClient.getDefaultWriteCachingStrategy());
+ if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug(
+ "Set non-null progress callback on DFSOutputStream " + src);
+ }
-
++
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ if (bytesPerChecksum <= 0) {
+ throw new HadoopIllegalArgumentException(
+ "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
+ }
+ if (blockSize % bytesPerChecksum != 0) {
+ throw new HadoopIllegalArgumentException("Invalid values: "
+ + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ + ") must divide block size (=" + blockSize + ").");
+ }
+ this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
+ }
+
+ /** Construct a new output stream for creating a file. */
+ protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+ EnumSet<CreateFlag> flag, Progressable progress,
- DataChecksum checksum, String[] favoredNodes) throws IOException {
++ DataChecksum checksum, String[] favoredNodes, boolean createStreamer)
++ throws IOException {
+ this(dfsClient, src, progress, stat, checksum);
+ this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
+
+ computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
+
- streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
- cachingStrategy, byteArrayManager, favoredNodes);
++ if (createStreamer) {
++ streamer = new DataStreamer(stat, null, dfsClient, src, progress,
++ checksum, cachingStrategy, byteArrayManager, favoredNodes);
++ }
+ }
+
+ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
+ FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
+ short replication, long blockSize, Progressable progress, int buffersize,
+ DataChecksum checksum, String[] favoredNodes) throws IOException {
+ TraceScope scope =
+ dfsClient.newPathTraceScope("newStreamForCreate", src);
+ try {
+ HdfsFileStatus stat = null;
+
+ // Retry the create if we get a RetryStartFileException up to a maximum
+ // number of times
+ boolean shouldRetry = true;
+ int retryCount = CREATE_RETRY_COUNT;
+ while (shouldRetry) {
+ shouldRetry = false;
+ try {
+ stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
+ new EnumSetWritable<CreateFlag>(flag), createParent, replication,
+ blockSize, SUPPORTED_CRYPTO_VERSIONS);
+ break;
+ } catch (RemoteException re) {
+ IOException e = re.unwrapRemoteException(
+ AccessControlException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ FileAlreadyExistsException.class,
+ FileNotFoundException.class,
+ ParentNotDirectoryException.class,
+ NSQuotaExceededException.class,
+ RetryStartFileException.class,
+ SafeModeException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class,
+ UnknownCryptoProtocolVersionException.class);
+ if (e instanceof RetryStartFileException) {
+ if (retryCount > 0) {
+ shouldRetry = true;
+ retryCount--;
+ } else {
+ throw new IOException("Too many retries because of encryption" +
+ " zone operations", e);
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+ Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
- final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes);
++ final DFSOutputStream out;
++ if(stat.getErasureCodingPolicy() != null) {
++ out = new DFSStripedOutputStream(dfsClient, src, stat,
++ flag, progress, checksum, favoredNodes);
++ } else {
++ out = new DFSOutputStream(dfsClient, src, stat,
++ flag, progress, checksum, favoredNodes, true);
++ }
+ out.start();
+ return out;
+ } finally {
+ scope.close();
+ }
+ }
+
+ /** Construct a new output stream for append. */
+ private DFSOutputStream(DFSClient dfsClient, String src,
+ EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
+ HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
- throws IOException {
++ throws IOException {
+ this(dfsClient, src, progress, stat, checksum);
+ initialFileSize = stat.getLen(); // length of file when opened
+ this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
+
+ boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
+
+ this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+
+ // The last partial block of the file has to be filled.
+ if (!toNewBlock && lastBlock != null) {
+ // indicate that we are appending to an existing block
+ streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
+ cachingStrategy, byteArrayManager);
+ getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
+ adjustPacketChunkSize(stat);
+ getStreamer().setPipelineInConstruction(lastBlock);
+ } else {
+ computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
+ bytesPerChecksum);
+ streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
+ dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+ favoredNodes);
+ }
+ }
+
+ private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{
+
+ long usedInLastBlock = stat.getLen() % blockSize;
+ int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+ // calculate the amount of free space in the pre-existing
+ // last crc chunk
+ int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+ int freeInCksum = bytesPerChecksum - usedInCksum;
+
+ // if there is space in the last block, then we have to
+ // append to that block
+ if (freeInLastBlock == blockSize) {
+ throw new IOException("The last block for file " +
+ src + " is full.");
+ }
+
+ if (usedInCksum > 0 && freeInCksum > 0) {
+ // if there is space in the last partial chunk, then
+ // setup in such a way that the next packet will have only
+ // one chunk that fills up the partial chunk.
+ //
+ computePacketChunkSize(0, freeInCksum);
+ setChecksumBufSize(freeInCksum);
+ getStreamer().setAppendChunk(true);
+ } else {
+ // if the remaining space in the block is smaller than
+ // that expected size of of a packet, then create
+ // smaller size packet.
+ //
+ computePacketChunkSize(
+ Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
+ bytesPerChecksum);
+ }
+ }
+
+ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
+ EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
+ LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
+ String[] favoredNodes) throws IOException {
+ TraceScope scope =
+ dfsClient.newPathTraceScope("newStreamForAppend", src);
++ if(stat.getErasureCodingPolicy() != null) {
++ throw new IOException("Not support appending to a striping layout file yet.");
++ }
+ try {
+ final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
+ progress, lastBlock, stat, checksum, favoredNodes);
+ out.start();
+ return out;
+ } finally {
+ scope.close();
+ }
+ }
+
+ protected void computePacketChunkSize(int psize, int csize) {
+ final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
+ final int chunkSize = csize + getChecksumSize();
+ chunksPerPacket = Math.max(bodySize/chunkSize, 1);
+ packetSize = chunkSize*chunksPerPacket;
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
+ ", chunkSize=" + chunkSize +
+ ", chunksPerPacket=" + chunksPerPacket +
+ ", packetSize=" + packetSize);
+ }
+ }
+
+ protected TraceScope createWriteTraceScope() {
+ return dfsClient.newPathTraceScope("DFSOutputStream#write", src);
+ }
+
+ // @see FSOutputSummer#writeChunk()
+ @Override
+ protected synchronized void writeChunk(byte[] b, int offset, int len,
+ byte[] checksum, int ckoff, int cklen) throws IOException {
+ dfsClient.checkOpen();
+ checkClosed();
+
+ if (len > bytesPerChecksum) {
+ throw new IOException("writeChunk() buffer size is " + len +
+ " is larger than supported bytesPerChecksum " +
+ bytesPerChecksum);
+ }
+ if (cklen != 0 && cklen != getChecksumSize()) {
+ throw new IOException("writeChunk() checksum size is supposed to be " +
+ getChecksumSize() + " but found to be " + cklen);
+ }
+
+ if (currentPacket == null) {
+ currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
+ .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
++ if (LOG.isDebugEnabled()) {
++ LOG.debug("WriteChunk allocating new packet seqno=" +
+ currentPacket.getSeqno() +
+ ", src=" + src +
+ ", packetSize=" + packetSize +
+ ", chunksPerPacket=" + chunksPerPacket +
- ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
++ ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + ", " + this);
+ }
+ }
+
+ currentPacket.writeChecksum(checksum, ckoff, cklen);
+ currentPacket.writeData(b, offset, len);
+ currentPacket.incNumChunks();
+ getStreamer().incBytesCurBlock(len);
+
+ // If packet is full, enqueue it for transmission
- //
+ if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
+ getStreamer().getBytesCurBlock() == blockSize) {
+ enqueueCurrentPacketFull();
+ }
+ }
+
+ void enqueueCurrentPacket() throws IOException {
+ getStreamer().waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
+
+ void enqueueCurrentPacketFull() throws IOException {
+ LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
- + " appendChunk={}, {}", currentPacket, src, getStreamer()
- .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
++ + " appendChunk={}, {}", currentPacket, src, getStreamer()
++ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
+ getStreamer());
+ enqueueCurrentPacket();
+ adjustChunkBoundary();
+ endBlock();
+ }
+
+ /** create an empty packet to mark the end of the block. */
+ void setCurrentPacketToEmpty() throws InterruptedIOException {
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+ getStreamer().getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
+ }
+
+ /**
+ * If the reopened file did not end at chunk boundary and the above
+ * write filled up its partial chunk. Tell the summer to generate full
+ * crc chunks from now on.
+ */
+ protected void adjustChunkBoundary() {
+ if (getStreamer().getAppendChunk() &&
+ getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
+ getStreamer().setAppendChunk(false);
+ resetChecksumBufSize();
+ }
+
+ if (!getStreamer().getAppendChunk()) {
+ int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
+ dfsClient.getConf().getWritePacketSize());
+ computePacketChunkSize(psize, bytesPerChecksum);
+ }
+ }
+
+ /**
+ * if encountering a block boundary, send an empty packet to
+ * indicate the end of block and reset bytesCurBlock.
+ *
+ * @throws IOException
+ */
- protected void endBlock() throws IOException {
++ void endBlock() throws IOException {
+ if (getStreamer().getBytesCurBlock() == blockSize) {
+ setCurrentPacketToEmpty();
+ enqueueCurrentPacket();
+ getStreamer().setBytesCurBlock(0);
+ lastFlushOffset = 0;
+ }
+ }
-
++
+ /**
+ * Flushes out to all replicas of the block. The data is in the buffers
+ * of the DNs but not necessarily in the DN's OS buffers.
+ *
+ * It is a synchronous operation. When it returns,
+ * it guarantees that flushed data become visible to new readers.
+ * It is not guaranteed that data has been flushed to
+ * persistent store on the datanode.
+ * Block allocations are persisted on namenode.
+ */
+ @Override
+ public void hflush() throws IOException {
+ TraceScope scope =
+ dfsClient.newPathTraceScope("hflush", src);
+ try {
+ flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
+ } finally {
+ scope.close();
+ }
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ TraceScope scope =
+ dfsClient.newPathTraceScope("hsync", src);
+ try {
+ flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
+ } finally {
+ scope.close();
+ }
+ }
-
++
+ /**
+ * The expected semantics is all data have flushed out to all replicas
+ * and all replicas have done posix fsync equivalent - ie the OS has
+ * flushed it to the disk device (but the disk may have it in its cache).
- *
++ *
+ * Note that only the current block is flushed to the disk device.
+ * To guarantee durable sync across block boundaries the stream should
+ * be created with {@link CreateFlag#SYNC_BLOCK}.
- *
++ *
+ * @param syncFlags
+ * Indicate the semantic of the sync. Currently used to specify
+ * whether or not to update the block length in NameNode.
+ */
+ public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
+ TraceScope scope =
+ dfsClient.newPathTraceScope("hsync", src);
+ try {
+ flushOrSync(true, syncFlags);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Flush/Sync buffered data to DataNodes.
- *
++ *
+ * @param isSync
+ * Whether or not to require all replicas to flush data to the disk
+ * device
+ * @param syncFlags
+ * Indicate extra detailed semantic of the flush/sync. Currently
+ * mainly used to specify whether or not to update the file length in
+ * the NameNode
+ * @throws IOException
+ */
+ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
+ throws IOException {
+ dfsClient.checkOpen();
+ checkClosed();
+ try {
+ long toWaitFor;
+ long lastBlockLength = -1L;
+ boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
+ boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
+ synchronized (this) {
+ // flush checksum buffer, but keep checksum buffer intact if we do not
+ // need to end the current block
+ int numKept = flushBuffer(!endBlock, true);
+ // bytesCurBlock potentially incremented if there was buffered data
+
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("DFSClient flush(): "
+ + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
+ + " lastFlushOffset=" + lastFlushOffset
+ + " createNewBlock=" + endBlock);
+ }
+ // Flush only if we haven't already flushed till this offset.
+ if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
+ assert getStreamer().getBytesCurBlock() > lastFlushOffset;
+ // record the valid offset of this flush
+ lastFlushOffset = getStreamer().getBytesCurBlock();
+ if (isSync && currentPacket == null && !endBlock) {
+ // Nothing to send right now,
+ // but sync was requested.
+ // Send an empty packet if we do not end the block right now
+ currentPacket = createPacket(packetSize, chunksPerPacket,
+ getStreamer().getBytesCurBlock(), getStreamer()
+ .getAndIncCurrentSeqno(), false);
+ }
+ } else {
+ if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
+ // Nothing to send right now,
+ // and the block was partially written,
+ // and sync was requested.
+ // So send an empty sync packet if we do not end the block right
+ // now
+ currentPacket = createPacket(packetSize, chunksPerPacket,
+ getStreamer().getBytesCurBlock(), getStreamer()
+ .getAndIncCurrentSeqno(), false);
+ } else if (currentPacket != null) {
+ // just discard the current packet since it is already been sent.
+ currentPacket.releaseBuffer(byteArrayManager);
+ currentPacket = null;
+ }
+ }
+ if (currentPacket != null) {
+ currentPacket.setSyncBlock(isSync);
+ enqueueCurrentPacket();
+ }
+ if (endBlock && getStreamer().getBytesCurBlock() > 0) {
+ // Need to end the current block, thus send an empty packet to
+ // indicate this is the end of the block and reset bytesCurBlock
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+ getStreamer().getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock || isSync);
+ enqueueCurrentPacket();
+ getStreamer().setBytesCurBlock(0);
+ lastFlushOffset = 0;
+ } else {
+ // Restore state of stream. Record the last flush offset
+ // of the last full chunk that was flushed.
+ getStreamer().setBytesCurBlock(
+ getStreamer().getBytesCurBlock() - numKept);
+ }
+
+ toWaitFor = getStreamer().getLastQueuedSeqno();
+ } // end synchronized
+
+ getStreamer().waitForAckedSeqno(toWaitFor);
+
+ // update the block length first time irrespective of flag
+ if (updateLength || getStreamer().getPersistBlocks().get()) {
+ synchronized (this) {
+ if (!getStreamer().streamerClosed()
+ && getStreamer().getBlock() != null) {
+ lastBlockLength = getStreamer().getBlock().getNumBytes();
+ }
+ }
+ }
+ // If 1) any new blocks were allocated since the last flush, or 2) to
+ // update length in NN is required, then persist block locations on
+ // namenode.
+ if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
+ try {
+ dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
+ lastBlockLength);
+ } catch (IOException ioe) {
+ DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
+ // If we got an error here, it might be because some other thread called
+ // close before our hflush completed. In that case, we should throw an
+ // exception that the stream is closed.
+ checkClosed();
+ // If we aren't closed but failed to sync, we should expose that to the
+ // caller.
+ throw ioe;
+ }
+ }
+
+ synchronized(this) {
+ if (!getStreamer().streamerClosed()) {
+ getStreamer().setHflush();
+ }
+ }
+ } catch (InterruptedIOException interrupt) {
+ // This kind of error doesn't mean that the stream itself is broken - just the
+ // flushing thread got interrupted. So, we shouldn't close down the writer,
+ // but instead just propagate the error
+ throw interrupt;
+ } catch (IOException e) {
+ DFSClient.LOG.warn("Error while syncing", e);
+ synchronized (this) {
+ if (!isClosed()) {
+ getStreamer().getLastException().set(e);
+ closeThreads(true);
+ }
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
+ */
+ @Deprecated
+ public synchronized int getNumCurrentReplicas() throws IOException {
+ return getCurrentBlockReplication();
+ }
+
+ /**
+ * Note that this is not a public API;
+ * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead.
- *
++ *
+ * @return the number of valid replicas of the current block
+ */
+ public synchronized int getCurrentBlockReplication() throws IOException {
+ dfsClient.checkOpen();
+ checkClosed();
+ if (getStreamer().streamerClosed()) {
+ return blockReplication; // no pipeline, return repl factor of file
+ }
+ DatanodeInfo[] currentNodes = getStreamer().getNodes();
+ if (currentNodes == null) {
+ return blockReplication; // no pipeline, return repl factor of file
+ }
+ return currentNodes.length;
+ }
-
++
+ /**
+ * Waits till all existing data is flushed and confirmations
+ * received from datanodes.
+ */
+ protected void flushInternal() throws IOException {
+ long toWaitFor;
+ synchronized (this) {
+ dfsClient.checkOpen();
+ checkClosed();
+ //
+ // If there is data in the current buffer, send it across
+ //
+ getStreamer().queuePacket(currentPacket);
+ currentPacket = null;
+ toWaitFor = getStreamer().getLastQueuedSeqno();
+ }
+
+ getStreamer().waitForAckedSeqno(toWaitFor);
+ }
+
+ protected synchronized void start() {
+ getStreamer().start();
+ }
-
++
+ /**
+ * Aborts this output stream and releases any system
+ * resources associated with this stream.
+ */
+ synchronized void abort() throws IOException {
+ if (isClosed()) {
+ return;
+ }
+ getStreamer().getLastException().set(new IOException("Lease timeout of "
+ + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
+ closeThreads(true);
+ dfsClient.endFileLease(fileId);
+ }
+
+ boolean isClosed() {
+ return closed || getStreamer().streamerClosed();
+ }
+
+ void setClosed() {
+ closed = true;
+ getStreamer().release();
+ }
+
+ // shutdown datastreamer and responseprocessor threads.
+ // interrupt datastreamer if force is true
+ protected void closeThreads(boolean force) throws IOException {
+ try {
+ getStreamer().close(force);
+ getStreamer().join();
+ getStreamer().closeSocket();
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to shutdown streamer");
+ } finally {
+ getStreamer().setSocketToNull();
+ setClosed();
+ }
+ }
-
++
+ /**
+ * Closes this output stream and releases any system
+ * resources associated with this stream.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ TraceScope scope =
+ dfsClient.newPathTraceScope("DFSOutputStream#close", src);
+ try {
+ closeImpl();
+ } finally {
+ scope.close();
+ }
+ }
+
+ protected synchronized void closeImpl() throws IOException {
+ if (isClosed()) {
+ getStreamer().getLastException().check(true);
+ return;
+ }
+
+ try {
+ flushBuffer(); // flush from all upper layers
+
+ if (currentPacket != null) {
+ enqueueCurrentPacket();
+ }
+
+ if (getStreamer().getBytesCurBlock() != 0) {
+ setCurrentPacketToEmpty();
+ }
+
+ flushInternal(); // flush all data to Datanodes
+ // get last block before destroying the streamer
+ ExtendedBlock lastBlock = getStreamer().getBlock();
+ closeThreads(false);
+ TraceScope scope = dfsClient.getTracer().newScope("completeFile");
+ try {
+ completeFile(lastBlock);
+ } finally {
+ scope.close();
+ }
+ dfsClient.endFileLease(fileId);
+ } catch (ClosedChannelException e) {
+ } finally {
+ setClosed();
+ }
+ }
+
+ // should be called holding (this) lock since setTestFilename() may
+ // be called during unit tests
+ protected void completeFile(ExtendedBlock last) throws IOException {
+ long localstart = Time.monotonicNow();
+ final DfsClientConf conf = dfsClient.getConf();
+ long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
+ boolean fileComplete = false;
+ int retries = conf.getNumBlockWriteLocateFollowingRetry();
+ while (!fileComplete) {
+ fileComplete =
+ dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
+ if (!fileComplete) {
+ final int hdfsTimeout = conf.getHdfsTimeout();
+ if (!dfsClient.clientRunning
+ || (hdfsTimeout > 0
+ && localstart + hdfsTimeout < Time.monotonicNow())) {
+ String msg = "Unable to close file because dfsclient " +
+ " was unable to contact the HDFS servers." +
+ " clientRunning " + dfsClient.clientRunning +
+ " hdfsTimeout " + hdfsTimeout;
+ DFSClient.LOG.info(msg);
+ throw new IOException(msg);
+ }
+ try {
+ if (retries == 0) {
+ throw new IOException("Unable to close file because the last block"
+ + " does not have enough number of replicas.");
+ }
+ retries--;
+ Thread.sleep(sleeptime);
+ sleeptime *= 2;
+ if (Time.monotonicNow() - localstart > 5000) {
+ DFSClient.LOG.info("Could not complete " + src + " retrying...");
+ }
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.warn("Caught exception ", ie);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void setArtificialSlowdown(long period) {
+ getStreamer().setArtificialSlowdown(period);
+ }
+
+ @VisibleForTesting
+ public synchronized void setChunksPerPacket(int value) {
+ chunksPerPacket = Math.min(chunksPerPacket, value);
+ packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
+ }
+
+ /**
+ * Returns the size of a file as it was when this stream was opened
+ */
+ public long getInitialLen() {
+ return initialFileSize;
+ }
+
+ /**
+ * @return the FileEncryptionInfo for this stream, or null if not encrypted.
+ */
+ public FileEncryptionInfo getFileEncryptionInfo() {
+ return fileEncryptionInfo;
+ }
+
+ /**
+ * Returns the access token currently used by streamer, for testing only
+ */
+ synchronized Token<BlockTokenIdentifier> getBlockToken() {
+ return getStreamer().getBlockToken();
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropBehind) throws IOException {
+ CachingStrategy prevStrategy, nextStrategy;
+ // CachingStrategy is immutable. So build a new CachingStrategy with the
+ // modifications we want, and compare-and-swap it in.
+ do {
+ prevStrategy = this.cachingStrategy.get();
+ nextStrategy = new CachingStrategy.Builder(prevStrategy).
- setDropBehind(dropBehind).build();
++ setDropBehind(dropBehind).build();
+ } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
+ }
+
+ @VisibleForTesting
+ ExtendedBlock getBlock() {
+ return getStreamer().getBlock();
+ }
+
+ @VisibleForTesting
+ public long getFileId() {
+ return fileId;
+ }
+
+ /**
+ * Return the source of stream.
+ */
+ String getSrc() {
+ return src;
+ }
+
+ /**
+ * Returns the data streamer object.
+ */
+ protected DataStreamer getStreamer() {
+ return streamer;
+ }
++
++ @Override
++ public String toString() {
++ return getClass().getSimpleName() + ":" + streamer;
++ }
++
++ static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient,
++ String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes)
++ throws IOException {
++ final DfsClientConf conf = dfsClient.getConf();
++ int retries = conf.getNumBlockWriteLocateFollowingRetry();
++ long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
++ long localstart = Time.monotonicNow();
++ while (true) {
++ try {
++ return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
++ excludedNodes, fileId, favoredNodes);
++ } catch (RemoteException e) {
++ IOException ue = e.unwrapRemoteException(FileNotFoundException.class,
++ AccessControlException.class,
++ NSQuotaExceededException.class,
++ DSQuotaExceededException.class,
++ QuotaByStorageTypeExceededException.class,
++ UnresolvedPathException.class);
++ if (ue != e) {
++ throw ue; // no need to retry these exceptions
++ }
++ if (NotReplicatedYetException.class.getName().equals(e.getClassName())) {
++ if (retries == 0) {
++ throw e;
++ } else {
++ --retries;
++ LOG.info("Exception while adding a block", e);
++ long elapsed = Time.monotonicNow() - localstart;
++ if (elapsed > 5000) {
++ LOG.info("Waiting for replication for " + (elapsed / 1000)
++ + " seconds");
++ }
++ try {
++ LOG.warn("NotReplicatedYetException sleeping " + src
++ + " retries left " + retries);
++ Thread.sleep(sleeptime);
++ sleeptime *= 2;
++ } catch (InterruptedException ie) {
++ LOG.warn("Caught exception", ie);
++ }
++ }
++ } else {
++ throw e;
++ }
++ }
++ }
++ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fd55202/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 0000000,9a8ca6f..191691b
mode 000000,100755..100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@@ -1,0 -1,350 +1,364 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.nio.BufferOverflowException;
++import java.nio.ByteBuffer;
+ import java.nio.channels.ClosedChannelException;
+ import java.util.Arrays;
+
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+ import org.apache.hadoop.hdfs.util.ByteArrayManager;
+ import org.apache.htrace.core.Span;
+ import org.apache.htrace.core.SpanId;
+ import org.apache.htrace.core.TraceScope;
+
+ /****************************************************************
+ * DFSPacket is used by DataStreamer and DFSOutputStream.
+ * DFSOutputStream generates packets and then ask DatStreamer
+ * to send them to datanodes.
+ ****************************************************************/
+
+ @InterfaceAudience.Private
-class DFSPacket {
++public class DFSPacket {
+ public static final long HEART_BEAT_SEQNO = -1L;
+ private static SpanId[] EMPTY = new SpanId[0];
+ private final long seqno; // sequence number of buffer in block
+ private final long offsetInBlock; // offset in block
+ private boolean syncBlock; // this packet forces the current block to disk
+ private int numChunks; // number of chunks currently in packet
+ private final int maxChunks; // max chunks in packet
+ private byte[] buf;
+ private final boolean lastPacketInBlock; // is this the last packet in block?
+
+ /**
+ * buf is pointed into like follows:
+ * (C is checksum data, D is payload data)
+ *
+ * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+ * ^ ^ ^ ^
+ * | checksumPos dataStart dataPos
+ * checksumStart
+ *
+ * Right before sending, we move the checksum data to immediately precede
+ * the actual data, and then insert the header into the buffer immediately
+ * preceding the checksum data, so we make sure to keep enough space in
+ * front of the checksum data to support the largest conceivable header.
+ */
+ private int checksumStart;
+ private int checksumPos;
+ private final int dataStart;
+ private int dataPos;
+ private SpanId[] traceParents = EMPTY;
+ private int traceParentsUsed;
+ private TraceScope scope;
+
+ /**
+ * Create a new packet.
+ *
+ * @param buf the buffer storing data and checksums
+ * @param chunksPerPkt maximum number of chunks per packet.
+ * @param offsetInBlock offset in bytes into the HDFS block.
+ * @param seqno the sequence number of this packet
+ * @param checksumSize the size of checksum
+ * @param lastPacketInBlock if this is the last packet
+ */
- DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
++ public DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+ int checksumSize, boolean lastPacketInBlock) {
+ this.lastPacketInBlock = lastPacketInBlock;
+ this.numChunks = 0;
+ this.offsetInBlock = offsetInBlock;
+ this.seqno = seqno;
+
+ this.buf = buf;
+
+ checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
+ checksumPos = checksumStart;
+ dataStart = checksumStart + (chunksPerPkt * checksumSize);
+ dataPos = dataStart;
+ maxChunks = chunksPerPkt;
+ }
+
+ /**
+ * Write data to this packet.
+ *
+ * @param inarray input array of data
+ * @param off the offset of data to write
+ * @param len the length of data to write
+ * @throws ClosedChannelException
+ */
+ synchronized void writeData(byte[] inarray, int off, int len)
+ throws ClosedChannelException {
+ checkBuffer();
+ if (dataPos + len > buf.length) {
+ throw new BufferOverflowException();
+ }
+ System.arraycopy(inarray, off, buf, dataPos, len);
+ dataPos += len;
+ }
+
++ public synchronized void writeData(ByteBuffer inBuffer, int len)
++ throws ClosedChannelException {
++ checkBuffer();
++ len = len > inBuffer.remaining() ? inBuffer.remaining() : len;
++ if (dataPos + len > buf.length) {
++ throw new BufferOverflowException();
++ }
++ for (int i = 0; i < len; i++) {
++ buf[dataPos + i] = inBuffer.get();
++ }
++ dataPos += len;
++ }
++
+ /**
+ * Write checksums to this packet
+ *
+ * @param inarray input array of checksums
+ * @param off the offset of checksums to write
+ * @param len the length of checksums to write
+ * @throws ClosedChannelException
+ */
- synchronized void writeChecksum(byte[] inarray, int off, int len)
++ public synchronized void writeChecksum(byte[] inarray, int off, int len)
+ throws ClosedChannelException {
+ checkBuffer();
+ if (len == 0) {
+ return;
+ }
+ if (checksumPos + len > dataStart) {
+ throw new BufferOverflowException();
+ }
+ System.arraycopy(inarray, off, buf, checksumPos, len);
+ checksumPos += len;
+ }
+
+ /**
+ * Write the full packet, including the header, to the given output stream.
+ *
+ * @param stm
+ * @throws IOException
+ */
- synchronized void writeTo(DataOutputStream stm) throws IOException {
++ public synchronized void writeTo(DataOutputStream stm) throws IOException {
+ checkBuffer();
+
+ final int dataLen = dataPos - dataStart;
+ final int checksumLen = checksumPos - checksumStart;
+ final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+ PacketHeader header = new PacketHeader(
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
+
+ if (checksumPos != dataStart) {
+ // Move the checksum to cover the gap. This can happen for the last
+ // packet or during an hflush/hsync call.
+ System.arraycopy(buf, checksumStart, buf,
+ dataStart - checksumLen , checksumLen);
+ checksumPos = dataStart;
+ checksumStart = checksumPos - checksumLen;
+ }
+
+ final int headerStart = checksumStart - header.getSerializedSize();
+ assert checksumStart + 1 >= header.getSerializedSize();
+ assert headerStart >= 0;
+ assert headerStart + header.getSerializedSize() == checksumStart;
+
+ // Copy the header data into the buffer immediately preceding the checksum
+ // data.
+ System.arraycopy(header.getBytes(), 0, buf, headerStart,
+ header.getSerializedSize());
+
+ // corrupt the data for testing.
+ if (DFSClientFaultInjector.get().corruptPacket()) {
+ buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+ }
+
+ // Write the now contiguous full packet to the output stream.
+ stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
+
+ // undo corruption.
+ if (DFSClientFaultInjector.get().uncorruptPacket()) {
+ buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+ }
+ }
+
+ private synchronized void checkBuffer() throws ClosedChannelException {
+ if (buf == null) {
+ throw new ClosedChannelException();
+ }
+ }
+
+ /**
+ * Release the buffer in this packet to ByteArrayManager.
+ *
+ * @param bam
+ */
+ synchronized void releaseBuffer(ByteArrayManager bam) {
+ bam.release(buf);
+ buf = null;
+ }
+
+ /**
+ * get the packet's last byte's offset in the block
+ *
+ * @return the packet's last byte's offset in the block
+ */
+ synchronized long getLastByteOffsetBlock() {
+ return offsetInBlock + dataPos - dataStart;
+ }
+
+ /**
+ * Check if this packet is a heart beat packet
+ *
+ * @return true if the sequence number is HEART_BEAT_SEQNO
+ */
+ boolean isHeartbeatPacket() {
+ return seqno == HEART_BEAT_SEQNO;
+ }
+
+ /**
+ * check if this packet is the last packet in block
+ *
+ * @return true if the packet is the last packet
+ */
- boolean isLastPacketInBlock(){
++ boolean isLastPacketInBlock() {
+ return lastPacketInBlock;
+ }
+
+ /**
+ * get sequence number of this packet
+ *
+ * @return the sequence number of this packet
+ */
- long getSeqno(){
++ long getSeqno() {
+ return seqno;
+ }
+
+ /**
+ * get the number of chunks this packet contains
+ *
+ * @return the number of chunks in this packet
+ */
- synchronized int getNumChunks(){
++ synchronized int getNumChunks() {
+ return numChunks;
+ }
+
+ /**
+ * increase the number of chunks by one
+ */
- synchronized void incNumChunks(){
++ synchronized void incNumChunks() {
+ numChunks++;
+ }
+
+ /**
+ * get the maximum number of packets
+ *
+ * @return the maximum number of packets
+ */
- int getMaxChunks(){
++ int getMaxChunks() {
+ return maxChunks;
+ }
+
+ /**
+ * set if to sync block
+ *
+ * @param syncBlock if to sync block
+ */
- synchronized void setSyncBlock(boolean syncBlock){
++ synchronized void setSyncBlock(boolean syncBlock) {
+ this.syncBlock = syncBlock;
+ }
+
+ @Override
+ public String toString() {
+ return "packet seqno: " + this.seqno +
+ " offsetInBlock: " + this.offsetInBlock +
+ " lastPacketInBlock: " + this.lastPacketInBlock +
+ " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+ }
+
+ /**
+ * Add a trace parent span for this packet.<p/>
+ *
+ * Trace parent spans for a packet are the trace spans responsible for
+ * adding data to that packet. We store them as an array of longs for
+ * efficiency.<p/>
+ *
+ * Protected by the DFSOutputStream dataQueue lock.
+ */
+ public void addTraceParent(Span span) {
+ if (span == null) {
+ return;
+ }
+ addTraceParent(span.getSpanId());
+ }
+
+ public void addTraceParent(SpanId id) {
+ if (!id.isValid()) {
+ return;
+ }
+ if (traceParentsUsed == traceParents.length) {
+ int newLength = (traceParents.length == 0) ? 8 :
+ traceParents.length * 2;
+ traceParents = Arrays.copyOf(traceParents, newLength);
+ }
+ traceParents[traceParentsUsed] = id;
+ traceParentsUsed++;
+ }
+
+ /**
+ * Get the trace parent spans for this packet.<p/>
+ *
+ * Will always be non-null.<p/>
+ *
+ * Protected by the DFSOutputStream dataQueue lock.
+ */
+ public SpanId[] getTraceParents() {
+ // Remove duplicates from the array.
+ int len = traceParentsUsed;
+ Arrays.sort(traceParents, 0, len);
+ int i = 0, j = 0;
+ SpanId prevVal = SpanId.INVALID;
+ while (true) {
+ if (i == len) {
+ break;
+ }
+ SpanId val = traceParents[i];
+ if (!val.equals(prevVal)) {
+ traceParents[j] = val;
+ j++;
+ prevVal = val;
+ }
+ i++;
+ }
+ if (j < traceParents.length) {
+ traceParents = Arrays.copyOf(traceParents, j);
+ traceParentsUsed = traceParents.length;
+ }
+ return traceParents;
+ }
+
+ public void setTraceScope(TraceScope scope) {
+ this.scope = scope;
+ }
+
+ public TraceScope getTraceScope() {
+ return scope;
+ }
+ }