You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/30 17:41:09 UTC
[09/58] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream
and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
new file mode 100755
index 0000000..de1d1ee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -0,0 +1,918 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/****************************************************************
+ * DFSOutputStream creates files from a stream of bytes.
+ *
+ * The client application writes data that is cached internally by
+ * this stream. Data is broken up into packets, each packet is
+ * typically 64K in size. A packet comprises of chunks. Each chunk
+ * is typically 512 bytes and has an associated checksum with it.
+ *
+ * When a client application fills up the currentPacket, it is
+ * enqueued into the dataQueue of DataStreamer. DataStreamer is a
+ * thread that picks up packets from the dataQueue and sends it to
+ * the first datanode in the pipeline.
+ *
+ ****************************************************************/
+@InterfaceAudience.Private
+public class DFSOutputStream extends FSOutputSummer
+ implements Syncable, CanSetDropBehind {
+ static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
+ /**
+ * Number of times to retry creating a file when there are transient
+ * errors (typically related to encryption zones and KeyProvider operations).
+ */
+ @VisibleForTesting
+ static final int CREATE_RETRY_COUNT = 10;
+ @VisibleForTesting
+ static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
+ CryptoProtocolVersion.supported();
+
+ protected final DFSClient dfsClient;
+ protected final ByteArrayManager byteArrayManager;
+ // closed is accessed by different threads under different locks.
+ protected volatile boolean closed = false;
+
+ protected final String src;
+ protected final long fileId;
+ protected final long blockSize;
+ protected final int bytesPerChecksum;
+
+ protected DFSPacket currentPacket = null;
+ private DataStreamer streamer;
+ protected int packetSize = 0; // write packet size, not including the header.
+ protected int chunksPerPacket = 0;
+ protected long lastFlushOffset = 0; // offset when flush was invoked
+ private long initialFileSize = 0; // at time of file open
+ private final short blockReplication; // replication factor of file
+ protected boolean shouldSyncBlock = false; // force blocks to disk upon close
+ protected final AtomicReference<CachingStrategy> cachingStrategy;
+ private FileEncryptionInfo fileEncryptionInfo;
+
+ /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
+ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
+ long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
+ final byte[] buf;
+ final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
+
+ try {
+ buf = byteArrayManager.newByteArray(bufferSize);
+ } catch (InterruptedException ie) {
+ final InterruptedIOException iioe = new InterruptedIOException(
+ "seqno=" + seqno);
+ iioe.initCause(ie);
+ throw iioe;
+ }
+
+ return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
+ getChecksumSize(), lastPacketInBlock);
+ }
+
+ @Override
+ protected void checkClosed() throws IOException {
+ if (isClosed()) {
+ getStreamer().getLastException().throwException4Close();
+ }
+ }
+
+ //
+ // returns the list of targets, if any, that is being currently used.
+ //
+ @VisibleForTesting
+ public synchronized DatanodeInfo[] getPipeline() {
+ if (getStreamer().streamerClosed()) {
+ return null;
+ }
+ DatanodeInfo[] currentNodes = getStreamer().getNodes();
+ if (currentNodes == null) {
+ return null;
+ }
+ DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
+ for (int i = 0; i < currentNodes.length; i++) {
+ value[i] = currentNodes[i];
+ }
+ return value;
+ }
+
+ /**
+ * @return the object for computing checksum.
+ * The type is NULL if checksum is not computed.
+ */
+ private static DataChecksum getChecksum4Compute(DataChecksum checksum,
+ HdfsFileStatus stat) {
+ if (DataStreamer.isLazyPersist(stat) && stat.getReplication() == 1) {
+ // do not compute checksum for writing to single replica to memory
+ return DataChecksum.newDataChecksum(Type.NULL,
+ checksum.getBytesPerChecksum());
+ }
+ return checksum;
+ }
+
+ private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
+ HdfsFileStatus stat, DataChecksum checksum) throws IOException {
+ super(getChecksum4Compute(checksum, stat));
+ this.dfsClient = dfsClient;
+ this.src = src;
+ this.fileId = stat.getFileId();
+ this.blockSize = stat.getBlockSize();
+ this.blockReplication = stat.getReplication();
+ this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+ this.cachingStrategy = new AtomicReference<CachingStrategy>(
+ dfsClient.getDefaultWriteCachingStrategy());
+ if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug(
+ "Set non-null progress callback on DFSOutputStream " + src);
+ }
+
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ if (bytesPerChecksum <= 0) {
+ throw new HadoopIllegalArgumentException(
+ "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
+ }
+ if (blockSize % bytesPerChecksum != 0) {
+ throw new HadoopIllegalArgumentException("Invalid values: "
+ + HdfsClientConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ + ") must divide block size (=" + blockSize + ").");
+ }
+ this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
+ }
+
+ /** Construct a new output stream for creating a file. */
+ protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+ EnumSet<CreateFlag> flag, Progressable progress,
+ DataChecksum checksum, String[] favoredNodes) throws IOException {
+ this(dfsClient, src, progress, stat, checksum);
+ this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
+
+ computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
+
+ streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
+ cachingStrategy, byteArrayManager, favoredNodes);
+ }
+
+ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
+ FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
+ short replication, long blockSize, Progressable progress, int buffersize,
+ DataChecksum checksum, String[] favoredNodes) throws IOException {
+ TraceScope scope =
+ dfsClient.getPathTraceScope("newStreamForCreate", src);
+ try {
+ HdfsFileStatus stat = null;
+
+ // Retry the create if we get a RetryStartFileException up to a maximum
+ // number of times
+ boolean shouldRetry = true;
+ int retryCount = CREATE_RETRY_COUNT;
+ while (shouldRetry) {
+ shouldRetry = false;
+ try {
+ stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
+ new EnumSetWritable<CreateFlag>(flag), createParent, replication,
+ blockSize, SUPPORTED_CRYPTO_VERSIONS);
+ break;
+ } catch (RemoteException re) {
+ IOException e = re.unwrapRemoteException(
+ AccessControlException.class,
+ DSQuotaExceededException.class,
+ QuotaByStorageTypeExceededException.class,
+ FileAlreadyExistsException.class,
+ FileNotFoundException.class,
+ ParentNotDirectoryException.class,
+ NSQuotaExceededException.class,
+ RetryStartFileException.class,
+ SafeModeException.class,
+ UnresolvedPathException.class,
+ SnapshotAccessControlException.class,
+ UnknownCryptoProtocolVersionException.class);
+ if (e instanceof RetryStartFileException) {
+ if (retryCount > 0) {
+ shouldRetry = true;
+ retryCount--;
+ } else {
+ throw new IOException("Too many retries because of encryption" +
+ " zone operations", e);
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+ Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
+ final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
+ flag, progress, checksum, favoredNodes);
+ out.start();
+ return out;
+ } finally {
+ scope.close();
+ }
+ }
+
+ /** Construct a new output stream for append. */
+ private DFSOutputStream(DFSClient dfsClient, String src,
+ EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
+ HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
+ throws IOException {
+ this(dfsClient, src, progress, stat, checksum);
+ initialFileSize = stat.getLen(); // length of file when opened
+ this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
+
+ boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
+
+ this.fileEncryptionInfo = stat.getFileEncryptionInfo();
+
+ // The last partial block of the file has to be filled.
+ if (!toNewBlock && lastBlock != null) {
+ // indicate that we are appending to an existing block
+ streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum,
+ cachingStrategy, byteArrayManager);
+ getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
+ adjustPacketChunkSize(stat);
+ getStreamer().setPipelineInConstruction(lastBlock);
+ } else {
+ computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
+ bytesPerChecksum);
+ streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
+ dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+ favoredNodes);
+ }
+ }
+
+ private void adjustPacketChunkSize(HdfsFileStatus stat) throws IOException{
+
+ long usedInLastBlock = stat.getLen() % blockSize;
+ int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+ // calculate the amount of free space in the pre-existing
+ // last crc chunk
+ int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+ int freeInCksum = bytesPerChecksum - usedInCksum;
+
+ // if there is space in the last block, then we have to
+ // append to that block
+ if (freeInLastBlock == blockSize) {
+ throw new IOException("The last block for file " +
+ src + " is full.");
+ }
+
+ if (usedInCksum > 0 && freeInCksum > 0) {
+ // if there is space in the last partial chunk, then
+ // setup in such a way that the next packet will have only
+ // one chunk that fills up the partial chunk.
+ //
+ computePacketChunkSize(0, freeInCksum);
+ setChecksumBufSize(freeInCksum);
+ getStreamer().setAppendChunk(true);
+ } else {
+ // if the remaining space in the block is smaller than
+ // that expected size of of a packet, then create
+ // smaller size packet.
+ //
+ computePacketChunkSize(
+ Math.min(dfsClient.getConf().getWritePacketSize(), freeInLastBlock),
+ bytesPerChecksum);
+ }
+ }
+
+ static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
+ EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
+ LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
+ String[] favoredNodes) throws IOException {
+ TraceScope scope =
+ dfsClient.getPathTraceScope("newStreamForAppend", src);
+ try {
+ final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
+ progress, lastBlock, stat, checksum, favoredNodes);
+ out.start();
+ return out;
+ } finally {
+ scope.close();
+ }
+ }
+
+ protected void computePacketChunkSize(int psize, int csize) {
+ final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
+ final int chunkSize = csize + getChecksumSize();
+ chunksPerPacket = Math.max(bodySize/chunkSize, 1);
+ packetSize = chunkSize*chunksPerPacket;
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
+ ", chunkSize=" + chunkSize +
+ ", chunksPerPacket=" + chunksPerPacket +
+ ", packetSize=" + packetSize);
+ }
+ }
+
+ protected TraceScope createWriteTraceScope() {
+ return dfsClient.getPathTraceScope("DFSOutputStream#write", src);
+ }
+
+ // @see FSOutputSummer#writeChunk()
+ @Override
+ protected synchronized void writeChunk(byte[] b, int offset, int len,
+ byte[] checksum, int ckoff, int cklen) throws IOException {
+ dfsClient.checkOpen();
+ checkClosed();
+
+ if (len > bytesPerChecksum) {
+ throw new IOException("writeChunk() buffer size is " + len +
+ " is larger than supported bytesPerChecksum " +
+ bytesPerChecksum);
+ }
+ if (cklen != 0 && cklen != getChecksumSize()) {
+ throw new IOException("writeChunk() checksum size is supposed to be " +
+ getChecksumSize() + " but found to be " + cklen);
+ }
+
+ if (currentPacket == null) {
+ currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
+ .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
+ currentPacket.getSeqno() +
+ ", src=" + src +
+ ", packetSize=" + packetSize +
+ ", chunksPerPacket=" + chunksPerPacket +
+ ", bytesCurBlock=" + getStreamer().getBytesCurBlock());
+ }
+ }
+
+ currentPacket.writeChecksum(checksum, ckoff, cklen);
+ currentPacket.writeData(b, offset, len);
+ currentPacket.incNumChunks();
+ getStreamer().incBytesCurBlock(len);
+
+ // If packet is full, enqueue it for transmission
+ //
+ if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
+ getStreamer().getBytesCurBlock() == blockSize) {
+ enqueueCurrentPacketFull();
+ }
+ }
+
+ void enqueueCurrentPacket() throws IOException {
+ getStreamer().waitAndQueuePacket(currentPacket);
+ currentPacket = null;
+ }
+
+ void enqueueCurrentPacketFull() throws IOException {
+ LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ + " appendChunk={}, {}", currentPacket, src, getStreamer()
+ .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
+ getStreamer());
+ enqueueCurrentPacket();
+ adjustChunkBoundary();
+ endBlock();
+ }
+
+ /** create an empty packet to mark the end of the block. */
+ void setCurrentPacketToEmpty() throws InterruptedIOException {
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+ getStreamer().getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock);
+ }
+
+ /**
+ * If the reopened file did not end at chunk boundary and the above
+ * write filled up its partial chunk. Tell the summer to generate full
+ * crc chunks from now on.
+ */
+ protected void adjustChunkBoundary() {
+ if (getStreamer().getAppendChunk() &&
+ getStreamer().getBytesCurBlock() % bytesPerChecksum == 0) {
+ getStreamer().setAppendChunk(false);
+ resetChecksumBufSize();
+ }
+
+ if (!getStreamer().getAppendChunk()) {
+ int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
+ dfsClient.getConf().getWritePacketSize());
+ computePacketChunkSize(psize, bytesPerChecksum);
+ }
+ }
+
+ /**
+ * if encountering a block boundary, send an empty packet to
+ * indicate the end of block and reset bytesCurBlock.
+ *
+ * @throws IOException
+ */
+ protected void endBlock() throws IOException {
+ if (getStreamer().getBytesCurBlock() == blockSize) {
+ setCurrentPacketToEmpty();
+ enqueueCurrentPacket();
+ getStreamer().setBytesCurBlock(0);
+ lastFlushOffset = 0;
+ }
+ }
+
+ /**
+ * Flushes out to all replicas of the block. The data is in the buffers
+ * of the DNs but not necessarily in the DN's OS buffers.
+ *
+ * It is a synchronous operation. When it returns,
+ * it guarantees that flushed data become visible to new readers.
+ * It is not guaranteed that data has been flushed to
+ * persistent store on the datanode.
+ * Block allocations are persisted on namenode.
+ */
+ @Override
+ public void hflush() throws IOException {
+ TraceScope scope =
+ dfsClient.getPathTraceScope("hflush", src);
+ try {
+ flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
+ } finally {
+ scope.close();
+ }
+ }
+
+ @Override
+ public void hsync() throws IOException {
+ TraceScope scope =
+ dfsClient.getPathTraceScope("hsync", src);
+ try {
+ flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * The expected semantics is all data have flushed out to all replicas
+ * and all replicas have done posix fsync equivalent - ie the OS has
+ * flushed it to the disk device (but the disk may have it in its cache).
+ *
+ * Note that only the current block is flushed to the disk device.
+ * To guarantee durable sync across block boundaries the stream should
+ * be created with {@link CreateFlag#SYNC_BLOCK}.
+ *
+ * @param syncFlags
+ * Indicate the semantic of the sync. Currently used to specify
+ * whether or not to update the block length in NameNode.
+ */
+ public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
+ TraceScope scope =
+ dfsClient.getPathTraceScope("hsync", src);
+ try {
+ flushOrSync(true, syncFlags);
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Flush/Sync buffered data to DataNodes.
+ *
+ * @param isSync
+ * Whether or not to require all replicas to flush data to the disk
+ * device
+ * @param syncFlags
+ * Indicate extra detailed semantic of the flush/sync. Currently
+ * mainly used to specify whether or not to update the file length in
+ * the NameNode
+ * @throws IOException
+ */
+ private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
+ throws IOException {
+ dfsClient.checkOpen();
+ checkClosed();
+ try {
+ long toWaitFor;
+ long lastBlockLength = -1L;
+ boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
+ boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
+ synchronized (this) {
+ // flush checksum buffer, but keep checksum buffer intact if we do not
+ // need to end the current block
+ int numKept = flushBuffer(!endBlock, true);
+ // bytesCurBlock potentially incremented if there was buffered data
+
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("DFSClient flush(): "
+ + " bytesCurBlock=" + getStreamer().getBytesCurBlock()
+ + " lastFlushOffset=" + lastFlushOffset
+ + " createNewBlock=" + endBlock);
+ }
+ // Flush only if we haven't already flushed till this offset.
+ if (lastFlushOffset != getStreamer().getBytesCurBlock()) {
+ assert getStreamer().getBytesCurBlock() > lastFlushOffset;
+ // record the valid offset of this flush
+ lastFlushOffset = getStreamer().getBytesCurBlock();
+ if (isSync && currentPacket == null && !endBlock) {
+ // Nothing to send right now,
+ // but sync was requested.
+ // Send an empty packet if we do not end the block right now
+ currentPacket = createPacket(packetSize, chunksPerPacket,
+ getStreamer().getBytesCurBlock(), getStreamer()
+ .getAndIncCurrentSeqno(), false);
+ }
+ } else {
+ if (isSync && getStreamer().getBytesCurBlock() > 0 && !endBlock) {
+ // Nothing to send right now,
+ // and the block was partially written,
+ // and sync was requested.
+ // So send an empty sync packet if we do not end the block right
+ // now
+ currentPacket = createPacket(packetSize, chunksPerPacket,
+ getStreamer().getBytesCurBlock(), getStreamer()
+ .getAndIncCurrentSeqno(), false);
+ } else if (currentPacket != null) {
+ // just discard the current packet since it is already been sent.
+ currentPacket.releaseBuffer(byteArrayManager);
+ currentPacket = null;
+ }
+ }
+ if (currentPacket != null) {
+ currentPacket.setSyncBlock(isSync);
+ enqueueCurrentPacket();
+ }
+ if (endBlock && getStreamer().getBytesCurBlock() > 0) {
+ // Need to end the current block, thus send an empty packet to
+ // indicate this is the end of the block and reset bytesCurBlock
+ currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
+ getStreamer().getAndIncCurrentSeqno(), true);
+ currentPacket.setSyncBlock(shouldSyncBlock || isSync);
+ enqueueCurrentPacket();
+ getStreamer().setBytesCurBlock(0);
+ lastFlushOffset = 0;
+ } else {
+ // Restore state of stream. Record the last flush offset
+ // of the last full chunk that was flushed.
+ getStreamer().setBytesCurBlock(
+ getStreamer().getBytesCurBlock() - numKept);
+ }
+
+ toWaitFor = getStreamer().getLastQueuedSeqno();
+ } // end synchronized
+
+ getStreamer().waitForAckedSeqno(toWaitFor);
+
+ // update the block length first time irrespective of flag
+ if (updateLength || getStreamer().getPersistBlocks().get()) {
+ synchronized (this) {
+ if (!getStreamer().streamerClosed()
+ && getStreamer().getBlock() != null) {
+ lastBlockLength = getStreamer().getBlock().getNumBytes();
+ }
+ }
+ }
+ // If 1) any new blocks were allocated since the last flush, or 2) to
+ // update length in NN is required, then persist block locations on
+ // namenode.
+ if (getStreamer().getPersistBlocks().getAndSet(false) || updateLength) {
+ try {
+ dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
+ lastBlockLength);
+ } catch (IOException ioe) {
+ DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
+ // If we got an error here, it might be because some other thread called
+ // close before our hflush completed. In that case, we should throw an
+ // exception that the stream is closed.
+ checkClosed();
+ // If we aren't closed but failed to sync, we should expose that to the
+ // caller.
+ throw ioe;
+ }
+ }
+
+ synchronized(this) {
+ if (!getStreamer().streamerClosed()) {
+ getStreamer().setHflush();
+ }
+ }
+ } catch (InterruptedIOException interrupt) {
+ // This kind of error doesn't mean that the stream itself is broken - just the
+ // flushing thread got interrupted. So, we shouldn't close down the writer,
+ // but instead just propagate the error
+ throw interrupt;
+ } catch (IOException e) {
+ DFSClient.LOG.warn("Error while syncing", e);
+ synchronized (this) {
+ if (!isClosed()) {
+ getStreamer().getLastException().set(e);
+ closeThreads(true);
+ }
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
+ */
+ @Deprecated
+ public synchronized int getNumCurrentReplicas() throws IOException {
+ return getCurrentBlockReplication();
+ }
+
+ /**
+ * Note that this is not a public API;
+ * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead.
+ *
+ * @return the number of valid replicas of the current block
+ */
+ public synchronized int getCurrentBlockReplication() throws IOException {
+ dfsClient.checkOpen();
+ checkClosed();
+ if (getStreamer().streamerClosed()) {
+ return blockReplication; // no pipeline, return repl factor of file
+ }
+ DatanodeInfo[] currentNodes = getStreamer().getNodes();
+ if (currentNodes == null) {
+ return blockReplication; // no pipeline, return repl factor of file
+ }
+ return currentNodes.length;
+ }
+
+ /**
+ * Waits till all existing data is flushed and confirmations
+ * received from datanodes.
+ */
+ protected void flushInternal() throws IOException {
+ long toWaitFor;
+ synchronized (this) {
+ dfsClient.checkOpen();
+ checkClosed();
+ //
+ // If there is data in the current buffer, send it across
+ //
+ getStreamer().queuePacket(currentPacket);
+ currentPacket = null;
+ toWaitFor = getStreamer().getLastQueuedSeqno();
+ }
+
+ getStreamer().waitForAckedSeqno(toWaitFor);
+ }
+
+ protected synchronized void start() {
+ getStreamer().start();
+ }
+
+ /**
+ * Aborts this output stream and releases any system
+ * resources associated with this stream.
+ */
+ synchronized void abort() throws IOException {
+ if (isClosed()) {
+ return;
+ }
+ getStreamer().getLastException().set(new IOException("Lease timeout of "
+ + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
+ closeThreads(true);
+ dfsClient.endFileLease(fileId);
+ }
+
+ boolean isClosed() {
+ return closed || getStreamer().streamerClosed();
+ }
+
+ void setClosed() {
+ closed = true;
+ getStreamer().release();
+ }
+
+ // shutdown datastreamer and responseprocessor threads.
+ // interrupt datastreamer if force is true
+ protected void closeThreads(boolean force) throws IOException {
+ try {
+ getStreamer().close(force);
+ getStreamer().join();
+ getStreamer().closeSocket();
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to shutdown streamer");
+ } finally {
+ getStreamer().setSocketToNull();
+ setClosed();
+ }
+ }
+
+ /**
+ * Closes this output stream and releases any system
+ * resources associated with this stream.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ TraceScope scope =
+ dfsClient.getPathTraceScope("DFSOutputStream#close", src);
+ try {
+ closeImpl();
+ } finally {
+ scope.close();
+ }
+ }
+
+ protected synchronized void closeImpl() throws IOException {
+ if (isClosed()) {
+ getStreamer().getLastException().check(true);
+ return;
+ }
+
+ try {
+ flushBuffer(); // flush from all upper layers
+
+ if (currentPacket != null) {
+ enqueueCurrentPacket();
+ }
+
+ if (getStreamer().getBytesCurBlock() != 0) {
+ setCurrentPacketToEmpty();
+ }
+
+ flushInternal(); // flush all data to Datanodes
+ // get last block before destroying the streamer
+ ExtendedBlock lastBlock = getStreamer().getBlock();
+ closeThreads(false);
+ TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+ try {
+ completeFile(lastBlock);
+ } finally {
+ scope.close();
+ }
+ dfsClient.endFileLease(fileId);
+ } catch (ClosedChannelException e) {
+ } finally {
+ setClosed();
+ }
+ }
+
+ // should be called holding (this) lock since setTestFilename() may
+ // be called during unit tests
+ protected void completeFile(ExtendedBlock last) throws IOException {
+ long localstart = Time.monotonicNow();
+ final DfsClientConf conf = dfsClient.getConf();
+ long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
+ boolean fileComplete = false;
+ int retries = conf.getNumBlockWriteLocateFollowingRetry();
+ while (!fileComplete) {
+ fileComplete =
+ dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
+ if (!fileComplete) {
+ final int hdfsTimeout = conf.getHdfsTimeout();
+ if (!dfsClient.clientRunning
+ || (hdfsTimeout > 0
+ && localstart + hdfsTimeout < Time.monotonicNow())) {
+ String msg = "Unable to close file because dfsclient " +
+ " was unable to contact the HDFS servers." +
+ " clientRunning " + dfsClient.clientRunning +
+ " hdfsTimeout " + hdfsTimeout;
+ DFSClient.LOG.info(msg);
+ throw new IOException(msg);
+ }
+ try {
+ if (retries == 0) {
+ throw new IOException("Unable to close file because the last block"
+ + " does not have enough number of replicas.");
+ }
+ retries--;
+ Thread.sleep(sleeptime);
+ sleeptime *= 2;
+ if (Time.monotonicNow() - localstart > 5000) {
+ DFSClient.LOG.info("Could not complete " + src + " retrying...");
+ }
+ } catch (InterruptedException ie) {
+ DFSClient.LOG.warn("Caught exception ", ie);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void setArtificialSlowdown(long period) {
+ getStreamer().setArtificialSlowdown(period);
+ }
+
+ @VisibleForTesting
+ public synchronized void setChunksPerPacket(int value) {
+ chunksPerPacket = Math.min(chunksPerPacket, value);
+ packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
+ }
+
+ /**
+ * Returns the size of a file as it was when this stream was opened
+ */
+ public long getInitialLen() {
+ return initialFileSize;
+ }
+
+ /**
+ * @return the FileEncryptionInfo for this stream, or null if not encrypted.
+ */
+ public FileEncryptionInfo getFileEncryptionInfo() {
+ return fileEncryptionInfo;
+ }
+
+ /**
+ * Returns the access token currently used by streamer, for testing only
+ */
+ synchronized Token<BlockTokenIdentifier> getBlockToken() {
+ return getStreamer().getBlockToken();
+ }
+
+ @Override
+ public void setDropBehind(Boolean dropBehind) throws IOException {
+ CachingStrategy prevStrategy, nextStrategy;
+ // CachingStrategy is immutable. So build a new CachingStrategy with the
+ // modifications we want, and compare-and-swap it in.
+ do {
+ prevStrategy = this.cachingStrategy.get();
+ nextStrategy = new CachingStrategy.Builder(prevStrategy).
+ setDropBehind(dropBehind).build();
+ } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
+ }
+
+ @VisibleForTesting
+ ExtendedBlock getBlock() {
+ return getStreamer().getBlock();
+ }
+
+ @VisibleForTesting
+ public long getFileId() {
+ return fileId;
+ }
+
+ /**
+ * Return the source of stream.
+ */
+ String getSrc() {
+ return src;
+ }
+
+ /**
+ * Returns the data streamer object.
+ */
+ protected DataStreamer getStreamer() {
+ return streamer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
new file mode 100755
index 0000000..22055c3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.htrace.Span;
+
+/****************************************************************
+ * DFSPacket is used by DataStreamer and DFSOutputStream.
+ * DFSOutputStream generates packets and then ask DatStreamer
+ * to send them to datanodes.
+ ****************************************************************/
+
+@InterfaceAudience.Private
+class DFSPacket {
+ public static final long HEART_BEAT_SEQNO = -1L;
+ private static long[] EMPTY = new long[0];
+ private final long seqno; // sequence number of buffer in block
+ private final long offsetInBlock; // offset in block
+ private boolean syncBlock; // this packet forces the current block to disk
+ private int numChunks; // number of chunks currently in packet
+ private final int maxChunks; // max chunks in packet
+ private byte[] buf;
+ private final boolean lastPacketInBlock; // is this the last packet in block?
+
+ /**
+ * buf is pointed into like follows:
+ * (C is checksum data, D is payload data)
+ *
+ * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+ * ^ ^ ^ ^
+ * | checksumPos dataStart dataPos
+ * checksumStart
+ *
+ * Right before sending, we move the checksum data to immediately precede
+ * the actual data, and then insert the header into the buffer immediately
+ * preceding the checksum data, so we make sure to keep enough space in
+ * front of the checksum data to support the largest conceivable header.
+ */
+ private int checksumStart;
+ private int checksumPos;
+ private final int dataStart;
+ private int dataPos;
+ private long[] traceParents = EMPTY;
+ private int traceParentsUsed;
+ private Span span;
+
+ /**
+ * Create a new packet.
+ *
+ * @param buf the buffer storing data and checksums
+ * @param chunksPerPkt maximum number of chunks per packet.
+ * @param offsetInBlock offset in bytes into the HDFS block.
+ * @param seqno the sequence number of this packet
+ * @param checksumSize the size of checksum
+ * @param lastPacketInBlock if this is the last packet
+ */
+ DFSPacket(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
+ int checksumSize, boolean lastPacketInBlock) {
+ this.lastPacketInBlock = lastPacketInBlock;
+ this.numChunks = 0;
+ this.offsetInBlock = offsetInBlock;
+ this.seqno = seqno;
+
+ this.buf = buf;
+
+ checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
+ checksumPos = checksumStart;
+ dataStart = checksumStart + (chunksPerPkt * checksumSize);
+ dataPos = dataStart;
+ maxChunks = chunksPerPkt;
+ }
+
+ /**
+ * Write data to this packet.
+ *
+ * @param inarray input array of data
+ * @param off the offset of data to write
+ * @param len the length of data to write
+ * @throws ClosedChannelException
+ */
+ synchronized void writeData(byte[] inarray, int off, int len)
+ throws ClosedChannelException {
+ checkBuffer();
+ if (dataPos + len > buf.length) {
+ throw new BufferOverflowException();
+ }
+ System.arraycopy(inarray, off, buf, dataPos, len);
+ dataPos += len;
+ }
+
+ /**
+ * Write checksums to this packet
+ *
+ * @param inarray input array of checksums
+ * @param off the offset of checksums to write
+ * @param len the length of checksums to write
+ * @throws ClosedChannelException
+ */
+ synchronized void writeChecksum(byte[] inarray, int off, int len)
+ throws ClosedChannelException {
+ checkBuffer();
+ if (len == 0) {
+ return;
+ }
+ if (checksumPos + len > dataStart) {
+ throw new BufferOverflowException();
+ }
+ System.arraycopy(inarray, off, buf, checksumPos, len);
+ checksumPos += len;
+ }
+
+ /**
+ * Write the full packet, including the header, to the given output stream.
+ *
+ * @param stm
+ * @throws IOException
+ */
+ synchronized void writeTo(DataOutputStream stm) throws IOException {
+ checkBuffer();
+
+ final int dataLen = dataPos - dataStart;
+ final int checksumLen = checksumPos - checksumStart;
+ final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+ PacketHeader header = new PacketHeader(
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
+
+ if (checksumPos != dataStart) {
+ // Move the checksum to cover the gap. This can happen for the last
+ // packet or during an hflush/hsync call.
+ System.arraycopy(buf, checksumStart, buf,
+ dataStart - checksumLen , checksumLen);
+ checksumPos = dataStart;
+ checksumStart = checksumPos - checksumLen;
+ }
+
+ final int headerStart = checksumStart - header.getSerializedSize();
+ assert checksumStart + 1 >= header.getSerializedSize();
+ assert headerStart >= 0;
+ assert headerStart + header.getSerializedSize() == checksumStart;
+
+ // Copy the header data into the buffer immediately preceding the checksum
+ // data.
+ System.arraycopy(header.getBytes(), 0, buf, headerStart,
+ header.getSerializedSize());
+
+ // corrupt the data for testing.
+ if (DFSClientFaultInjector.get().corruptPacket()) {
+ buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+ }
+
+ // Write the now contiguous full packet to the output stream.
+ stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
+
+ // undo corruption.
+ if (DFSClientFaultInjector.get().uncorruptPacket()) {
+ buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
+ }
+ }
+
+ private synchronized void checkBuffer() throws ClosedChannelException {
+ if (buf == null) {
+ throw new ClosedChannelException();
+ }
+ }
+
+ /**
+ * Release the buffer in this packet to ByteArrayManager.
+ *
+ * @param bam
+ */
+ synchronized void releaseBuffer(ByteArrayManager bam) {
+ bam.release(buf);
+ buf = null;
+ }
+
+ /**
+ * get the packet's last byte's offset in the block
+ *
+ * @return the packet's last byte's offset in the block
+ */
+ synchronized long getLastByteOffsetBlock() {
+ return offsetInBlock + dataPos - dataStart;
+ }
+
+ /**
+ * Check if this packet is a heart beat packet
+ *
+ * @return true if the sequence number is HEART_BEAT_SEQNO
+ */
+ boolean isHeartbeatPacket() {
+ return seqno == HEART_BEAT_SEQNO;
+ }
+
+ /**
+ * check if this packet is the last packet in block
+ *
+ * @return true if the packet is the last packet
+ */
+ boolean isLastPacketInBlock(){
+ return lastPacketInBlock;
+ }
+
+ /**
+ * get sequence number of this packet
+ *
+ * @return the sequence number of this packet
+ */
+ long getSeqno(){
+ return seqno;
+ }
+
+ /**
+ * get the number of chunks this packet contains
+ *
+ * @return the number of chunks in this packet
+ */
+ synchronized int getNumChunks(){
+ return numChunks;
+ }
+
+ /**
+ * increase the number of chunks by one
+ */
+ synchronized void incNumChunks(){
+ numChunks++;
+ }
+
+ /**
+ * get the maximum number of packets
+ *
+ * @return the maximum number of packets
+ */
+ int getMaxChunks(){
+ return maxChunks;
+ }
+
+ /**
+ * set if to sync block
+ *
+ * @param syncBlock if to sync block
+ */
+ synchronized void setSyncBlock(boolean syncBlock){
+ this.syncBlock = syncBlock;
+ }
+
+ @Override
+ public String toString() {
+ return "packet seqno: " + this.seqno +
+ " offsetInBlock: " + this.offsetInBlock +
+ " lastPacketInBlock: " + this.lastPacketInBlock +
+ " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+ }
+
+ /**
+ * Add a trace parent span for this packet.<p/>
+ *
+ * Trace parent spans for a packet are the trace spans responsible for
+ * adding data to that packet. We store them as an array of longs for
+ * efficiency.<p/>
+ *
+ * Protected by the DFSOutputStream dataQueue lock.
+ */
+ public void addTraceParent(Span span) {
+ if (span == null) {
+ return;
+ }
+ addTraceParent(span.getSpanId());
+ }
+
+ public void addTraceParent(long id) {
+ if (traceParentsUsed == traceParents.length) {
+ int newLength = (traceParents.length == 0) ? 8 :
+ traceParents.length * 2;
+ traceParents = Arrays.copyOf(traceParents, newLength);
+ }
+ traceParents[traceParentsUsed] = id;
+ traceParentsUsed++;
+ }
+
+ /**
+ * Get the trace parent spans for this packet.<p/>
+ *
+ * Will always be non-null.<p/>
+ *
+ * Protected by the DFSOutputStream dataQueue lock.
+ */
+ public long[] getTraceParents() {
+ // Remove duplicates from the array.
+ int len = traceParentsUsed;
+ Arrays.sort(traceParents, 0, len);
+ int i = 0, j = 0;
+ long prevVal = 0; // 0 is not a valid span id
+ while (true) {
+ if (i == len) {
+ break;
+ }
+ long val = traceParents[i];
+ if (val != prevVal) {
+ traceParents[j] = val;
+ j++;
+ prevVal = val;
+ }
+ i++;
+ }
+ if (j < traceParents.length) {
+ traceParents = Arrays.copyOf(traceParents, j);
+ traceParentsUsed = traceParents.length;
+ }
+ return traceParents;
+ }
+
+ public void setTraceSpan(Span span) {
+ this.span = span;
+ }
+
+ public Span getTraceSpan() {
+ return span;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 359886e..e275afb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -590,6 +591,29 @@ public class DFSUtilClient {
}
}
+ public static int getIoFileBufferSize(Configuration conf) {
+ return conf.getInt(
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ }
+
+ public static int getSmallBufferSize(Configuration conf) {
+ return Math.min(getIoFileBufferSize(conf) / 2, 512);
+ }
+
+ /**
+ * Probe for HDFS Encryption being enabled; this uses the value of
+ * the option {@link HdfsClientConfigKeys#DFS_ENCRYPTION_KEY_PROVIDER_URI},
+ * returning true if that property contains a non-empty, non-whitespace
+ * string.
+ * @param conf configuration to probe
+ * @return true if encryption is considered enabled.
+ */
+ public static boolean isHDFSEncryptionEnabled(Configuration conf) {
+ return !conf.getTrimmed(
+ HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
+ }
+
public static InetSocketAddress getNNAddress(String address) {
return NetUtils.createSocketAddr(address,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT);