You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/29 22:30:24 UTC
[22/50] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream
and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
new file mode 100644
index 0000000..12496e2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used for injecting faults in DFSClient and DFSOutputStream tests.
+ * Calls into this are a no-op in production code.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class DFSClientFaultInjector {
+ private static DFSClientFaultInjector instance = new DFSClientFaultInjector();
+ public static AtomicLong exceptionNum = new AtomicLong(0);
+
+ public static DFSClientFaultInjector get() {
+ return instance;
+ }
+ public static void set(DFSClientFaultInjector instance) {
+ DFSClientFaultInjector.instance = instance;
+ }
+
+ public boolean corruptPacket() {
+ return false;
+ }
+
+ public boolean uncorruptPacket() {
+ return false;
+ }
+
+ public boolean failPacket() {
+ return false;
+ }
+
+ public void startFetchFromDatanode() {}
+
+ public void fetchFromDatanodeException() {}
+
+ public void readFromDatanodeDelay() {}
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
new file mode 100644
index 0000000..2a228e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * The client-side metrics for hedged read feature.
+ * This class has a number of metrics variables that are publicly accessible,
+ * we can grab them from client side, like HBase.
+ */
+@InterfaceAudience.Private
+public class DFSHedgedReadMetrics {
+ public final AtomicLong hedgedReadOps = new AtomicLong();
+ public final AtomicLong hedgedReadOpsWin = new AtomicLong();
+ public final AtomicLong hedgedReadOpsInCurThread = new AtomicLong();
+
+ public void incHedgedReadOps() {
+ hedgedReadOps.incrementAndGet();
+ }
+
+ public void incHedgedReadOpsInCurThread() {
+ hedgedReadOpsInCurThread.incrementAndGet();
+ }
+
+ public void incHedgedReadWins() {
+ hedgedReadOpsWin.incrementAndGet();
+ }
+
+ public long getHedgedReadOps() {
+ return hedgedReadOps.longValue();
+ }
+
+ public long getHedgedReadOpsInCurThread() {
+ return hedgedReadOpsInCurThread.longValue();
+ }
+
+ public long getHedgedReadWins() {
+ return hedgedReadOpsWin.longValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
new file mode 100644
index 0000000..11a1d29
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import com.google.common.collect.Iterators;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
+import org.apache.hadoop.hdfs.inotify.MissingEventsException;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.util.Time;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Stream for reading inotify events. DFSInotifyEventInputStreams should not
+ * be shared among multiple threads.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class DFSInotifyEventInputStream {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ DFSInotifyEventInputStream.class);
+
+ /**
+ * The trace sampler to use when making RPCs to the NameNode.
+ */
+ private final Sampler<?> traceSampler;
+
+ private final ClientProtocol namenode;
+ private Iterator<EventBatch> it;
+ private long lastReadTxid;
+ /**
+ * The most recent txid the NameNode told us it has sync'ed -- helps us
+ * determine how far behind we are in the edit stream.
+ */
+ private long syncTxid;
+ /**
+ * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}.
+ */
+ private Random rng = new Random();
+
+ private static final int INITIAL_WAIT_MS = 10;
+
+ DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode)
+ throws IOException {
+ // Only consider new transaction IDs.
+ this(traceSampler, namenode, namenode.getCurrentEditLogTxid());
+ }
+
+ DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode,
+ long lastReadTxid) throws IOException {
+ this.traceSampler = traceSampler;
+ this.namenode = namenode;
+ this.it = Iterators.emptyIterator();
+ this.lastReadTxid = lastReadTxid;
+ }
+
+ /**
+ * Returns the next batch of events in the stream or null if no new
+ * batches are currently available.
+ *
+ * @throws IOException because of network error or edit log
+ * corruption. Also possible if JournalNodes are unresponsive in the
+ * QJM setting (even one unresponsive JournalNode is enough in rare cases),
+ * so catching this exception and retrying at least a few times is
+ * recommended.
+ * @throws MissingEventsException if we cannot return the next batch in the
+ * stream because the data for the events (and possibly some subsequent
+ * events) has been deleted (generally because this stream is a very large
+ * number of transactions behind the current state of the NameNode). It is
+ * safe to continue reading from the stream after this exception is thrown
+ * The next available batch of events will be returned.
+ */
+ public EventBatch poll() throws IOException, MissingEventsException {
+ TraceScope scope =
+ Trace.startSpan("inotifyPoll", traceSampler);
+ try {
+ // need to keep retrying until the NN sends us the latest committed txid
+ if (lastReadTxid == -1) {
+ LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
+ lastReadTxid = namenode.getCurrentEditLogTxid();
+ return null;
+ }
+ if (!it.hasNext()) {
+ EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
+ if (el.getLastTxid() != -1) {
+ // we only want to set syncTxid when we were actually able to read some
+ // edits on the NN -- otherwise it will seem like edits are being
+ // generated faster than we can read them when the problem is really
+ // that we are temporarily unable to read edits
+ syncTxid = el.getSyncTxid();
+ it = el.getBatches().iterator();
+ long formerLastReadTxid = lastReadTxid;
+ lastReadTxid = el.getLastTxid();
+ if (el.getFirstTxid() != formerLastReadTxid + 1) {
+ throw new MissingEventsException(formerLastReadTxid + 1,
+ el.getFirstTxid());
+ }
+ } else {
+ LOG.debug("poll(): read no edits from the NN when requesting edits " +
+ "after txid {}", lastReadTxid);
+ return null;
+ }
+ }
+
+ if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
+ // newly seen edit log ops actually got converted to events
+ return it.next();
+ } else {
+ return null;
+ }
+ } finally {
+ scope.close();
+ }
+ }
+
+ /**
+ * Return a estimate of how many transaction IDs behind the NameNode's
+ * current state this stream is. Clients should periodically call this method
+ * and check if its result is steadily increasing, which indicates that they
+ * are falling behind (i.e. transaction are being generated faster than the
+ * client is reading them). If a client falls too far behind events may be
+ * deleted before the client can read them.
+ * <p/>
+ * A return value of -1 indicates that an estimate could not be produced, and
+ * should be ignored. The value returned by this method is really only useful
+ * when compared to previous or subsequent returned values.
+ */
+ public long getTxidsBehindEstimate() {
+ if (syncTxid == 0) {
+ return -1;
+ } else {
+ assert syncTxid >= lastReadTxid;
+ // this gives the difference between the last txid we have fetched to the
+ // client and syncTxid at the time we last fetched events from the
+ // NameNode
+ return syncTxid - lastReadTxid;
+ }
+ }
+
+ /**
+ * Returns the next event batch in the stream, waiting up to the specified
+ * amount of time for a new batch. Returns null if one is not available at the
+ * end of the specified amount of time. The time before the method returns may
+ * exceed the specified amount of time by up to the time required for an RPC
+ * to the NameNode.
+ *
+ * @param time number of units of the given TimeUnit to wait
+ * @param tu the desired TimeUnit
+ * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
+ * @throws MissingEventsException
+ * see {@link DFSInotifyEventInputStream#poll()}
+ * @throws InterruptedException if the calling thread is interrupted
+ */
+ public EventBatch poll(long time, TimeUnit tu) throws IOException,
+ InterruptedException, MissingEventsException {
+ TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
+ EventBatch next = null;
+ try {
+ long initialTime = Time.monotonicNow();
+ long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
+ long nextWait = INITIAL_WAIT_MS;
+ while ((next = poll()) == null) {
+ long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
+ if (timeLeft <= 0) {
+ LOG.debug("timed poll(): timed out");
+ break;
+ } else if (timeLeft < nextWait * 2) {
+ nextWait = timeLeft;
+ } else {
+ nextWait *= 2;
+ }
+ LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
+ nextWait);
+ Thread.sleep(nextWait);
+ }
+ } finally {
+ scope.close();
+ }
+ return next;
+ }
+
+ /**
+ * Returns the next batch of events in the stream, waiting indefinitely if
+ * a new batch is not immediately available.
+ *
+ * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
+ * @throws MissingEventsException see
+ * {@link DFSInotifyEventInputStream#poll()}
+ * @throws InterruptedException if the calling thread is interrupted
+ */
+ public EventBatch take() throws IOException, InterruptedException,
+ MissingEventsException {
+ TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
+ EventBatch next = null;
+ try {
+ int nextWaitMin = INITIAL_WAIT_MS;
+ while ((next = poll()) == null) {
+ // sleep for a random period between nextWaitMin and nextWaitMin * 2
+ // to avoid stampedes at the NN if there are multiple clients
+ int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
+ LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
+ Thread.sleep(sleepTime);
+ // the maximum sleep is 2 minutes
+ nextWaitMin = Math.min(60000, nextWaitMin * 2);
+ }
+ } finally {
+ scope.close();
+ }
+
+ return next;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
new file mode 100644
index 0000000..139a27c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -0,0 +1,1915 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ByteBufferUtil;
+import org.apache.hadoop.fs.CanSetDropBehind;
+import org.apache.hadoop.fs.CanSetReadahead;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.IdentityHashStore;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/****************************************************************
+ * DFSInputStream provides bytes from a named file. It handles
+ * negotiation of the namenode and various datanodes as necessary.
+ ****************************************************************/
+@InterfaceAudience.Private
+public class DFSInputStream extends FSInputStream
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+ HasEnhancedByteBufferAccess, CanUnbuffer {
+ @VisibleForTesting
+ public static boolean tcpReadsDisabledForTesting = false;
+ private long hedgedReadOpsLoopNumForTesting = 0;
+ protected final DFSClient dfsClient;
+ protected AtomicBoolean closed = new AtomicBoolean(false);
+ protected final String src;
+ protected final boolean verifyChecksum;
+
+ // state by stateful read only:
+ // (protected by lock on this)
+ /////
+ private DatanodeInfo currentNode = null;
+ protected LocatedBlock currentLocatedBlock = null;
+ protected long pos = 0;
+ protected long blockEnd = -1;
+ private BlockReader blockReader = null;
+ ////
+
+ // state shared by stateful and positional read:
+ // (protected by lock on infoLock)
+ ////
+ protected LocatedBlocks locatedBlocks = null;
+ private long lastBlockBeingWrittenLength = 0;
+ private FileEncryptionInfo fileEncryptionInfo = null;
+ protected CachingStrategy cachingStrategy;
+ ////
+
+ protected final ReadStatistics readStatistics = new ReadStatistics();
+ // lock for state shared between read and pread
+ // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
+ // (it's OK to acquire this lock when the lock on <this> is held)
+ protected final Object infoLock = new Object();
+
+ /**
+ * Track the ByteBuffers that we have handed out to readers.
+ *
+ * The value type can be either ByteBufferPool or ClientMmap, depending on
+ * whether we this is a memory-mapped buffer or not.
+ */
+ private IdentityHashStore<ByteBuffer, Object> extendedReadBuffers;
+
+ private synchronized IdentityHashStore<ByteBuffer, Object>
+ getExtendedReadBuffers() {
+ if (extendedReadBuffers == null) {
+ extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
+ }
+ return extendedReadBuffers;
+ }
+
+ public static class ReadStatistics {
+ public ReadStatistics() {
+ clear();
+ }
+
+ public ReadStatistics(ReadStatistics rhs) {
+ this.totalBytesRead = rhs.getTotalBytesRead();
+ this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
+ this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
+ this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
+ }
+
+ /**
+ * @return The total bytes read. This will always be at least as
+ * high as the other numbers, since it includes all of them.
+ */
+ public long getTotalBytesRead() {
+ return totalBytesRead;
+ }
+
+ /**
+ * @return The total local bytes read. This will always be at least
+ * as high as totalShortCircuitBytesRead, since all short-circuit
+ * reads are also local.
+ */
+ public long getTotalLocalBytesRead() {
+ return totalLocalBytesRead;
+ }
+
+ /**
+ * @return The total short-circuit local bytes read.
+ */
+ public long getTotalShortCircuitBytesRead() {
+ return totalShortCircuitBytesRead;
+ }
+
+ /**
+ * @return The total number of zero-copy bytes read.
+ */
+ public long getTotalZeroCopyBytesRead() {
+ return totalZeroCopyBytesRead;
+ }
+
+ /**
+ * @return The total number of bytes read which were not local.
+ */
+ public long getRemoteBytesRead() {
+ return totalBytesRead - totalLocalBytesRead;
+ }
+
+ void addRemoteBytes(long amt) {
+ this.totalBytesRead += amt;
+ }
+
+ void addLocalBytes(long amt) {
+ this.totalBytesRead += amt;
+ this.totalLocalBytesRead += amt;
+ }
+
+ void addShortCircuitBytes(long amt) {
+ this.totalBytesRead += amt;
+ this.totalLocalBytesRead += amt;
+ this.totalShortCircuitBytesRead += amt;
+ }
+
+ void addZeroCopyBytes(long amt) {
+ this.totalBytesRead += amt;
+ this.totalLocalBytesRead += amt;
+ this.totalShortCircuitBytesRead += amt;
+ this.totalZeroCopyBytesRead += amt;
+ }
+
+ void clear() {
+ this.totalBytesRead = 0;
+ this.totalLocalBytesRead = 0;
+ this.totalShortCircuitBytesRead = 0;
+ this.totalZeroCopyBytesRead = 0;
+ }
+
+ private long totalBytesRead;
+
+ private long totalLocalBytesRead;
+
+ private long totalShortCircuitBytesRead;
+
+ private long totalZeroCopyBytesRead;
+ }
+
+ /**
+ * This variable tracks the number of failures since the start of the
+ * most recent user-facing operation. That is to say, it should be reset
+ * whenever the user makes a call on this stream, and if at any point
+ * during the retry logic, the failure count exceeds a threshold,
+ * the errors will be thrown back to the operation.
+ *
+ * Specifically this counts the number of times the client has gone
+ * back to the namenode to get a new list of block locations, and is
+ * capped at maxBlockAcquireFailures
+ */
+ protected int failures = 0;
+
+ /* XXX Use of CocurrentHashMap is temp fix. Need to fix
+ * parallel accesses to DFSInputStream (through ptreads) properly */
+ private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
+ new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
+
+ private byte[] oneByteBuf; // used for 'int read()'
+
+ void addToDeadNodes(DatanodeInfo dnInfo) {
+ deadNodes.put(dnInfo, dnInfo);
+ }
+
+ DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
+ LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
+ this.dfsClient = dfsClient;
+ this.verifyChecksum = verifyChecksum;
+ this.src = src;
+ synchronized (infoLock) {
+ this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
+ }
+ this.locatedBlocks = locatedBlocks;
+ openInfo(false);
+ }
+
+ /**
+ * Grab the open-file info from namenode
+ * @param refreshLocatedBlocks whether to re-fetch locatedblocks
+ */
+ void openInfo(boolean refreshLocatedBlocks) throws IOException,
+ UnresolvedLinkException {
+ final DfsClientConf conf = dfsClient.getConf();
+ synchronized(infoLock) {
+ lastBlockBeingWrittenLength =
+ fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
+ int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
+ while (retriesForLastBlockLength > 0) {
+ // Getting last block length as -1 is a special case. When cluster
+ // restarts, DNs may not report immediately. At this time partial block
+ // locations will not be available with NN for getting the length. Lets
+ // retry for 3 times to get the length.
+ if (lastBlockBeingWrittenLength == -1) {
+ DFSClient.LOG.warn("Last block locations not available. "
+ + "Datanodes might not have reported blocks completely."
+ + " Will retry for " + retriesForLastBlockLength + " times");
+ waitFor(conf.getRetryIntervalForGetLastBlockLength());
+ lastBlockBeingWrittenLength =
+ fetchLocatedBlocksAndGetLastBlockLength(true);
+ } else {
+ break;
+ }
+ retriesForLastBlockLength--;
+ }
+ if (retriesForLastBlockLength == 0) {
+ throw new IOException("Could not obtain the last block locations.");
+ }
+ }
+ }
+
+ private void waitFor(int waitTime) throws IOException {
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Interrupted while getting the last block length.");
+ }
+ }
+
+ private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
+ throws IOException {
+ LocatedBlocks newInfo = locatedBlocks;
+ if (locatedBlocks == null || refresh) {
+ newInfo = dfsClient.getLocatedBlocks(src, 0);
+ }
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("newInfo = " + newInfo);
+ }
+ if (newInfo == null) {
+ throw new IOException("Cannot open filename " + src);
+ }
+
+ if (locatedBlocks != null) {
+ Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
+ Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
+ while (oldIter.hasNext() && newIter.hasNext()) {
+ if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
+ throw new IOException("Blocklist for " + src + " has changed!");
+ }
+ }
+ }
+ locatedBlocks = newInfo;
+ long lastBlockBeingWrittenLength = 0;
+ if (!locatedBlocks.isLastBlockComplete()) {
+ final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+ if (last != null) {
+ if (last.getLocations().length == 0) {
+ if (last.getBlockSize() == 0) {
+ // if the length is zero, then no data has been written to
+ // datanode. So no need to wait for the locations.
+ return 0;
+ }
+ return -1;
+ }
+ final long len = readBlockLength(last);
+ last.getBlock().setNumBytes(len);
+ lastBlockBeingWrittenLength = len;
+ }
+ }
+
+ fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
+
+ return lastBlockBeingWrittenLength;
+ }
+
+ /** Read the block length from one of the datanodes. */
+ private long readBlockLength(LocatedBlock locatedblock) throws IOException {
+ assert locatedblock != null : "LocatedBlock cannot be null";
+ int replicaNotFoundCount = locatedblock.getLocations().length;
+
+ final DfsClientConf conf = dfsClient.getConf();
+ for(DatanodeInfo datanode : locatedblock.getLocations()) {
+ ClientDatanodeProtocol cdp = null;
+
+ try {
+ cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
+ dfsClient.getConfiguration(), conf.getSocketTimeout(),
+ conf.isConnectToDnViaHostname(), locatedblock);
+
+ final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
+
+ if (n >= 0) {
+ return n;
+ }
+ }
+ catch(IOException ioe) {
+ if (ioe instanceof RemoteException &&
+ (((RemoteException) ioe).unwrapRemoteException() instanceof
+ ReplicaNotFoundException)) {
+ // special case : replica might not be on the DN, treat as 0 length
+ replicaNotFoundCount--;
+ }
+
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode "
+ + datanode + " for block " + locatedblock.getBlock(), ioe);
+ }
+ } finally {
+ if (cdp != null) {
+ RPC.stopProxy(cdp);
+ }
+ }
+ }
+
+ // Namenode told us about these locations, but none know about the replica
+ // means that we hit the race between pipeline creation start and end.
+ // we require all 3 because some other exception could have happened
+ // on a DN that has it. we want to report that error
+ if (replicaNotFoundCount == 0) {
+ return 0;
+ }
+
+ throw new IOException("Cannot obtain block length for " + locatedblock);
+ }
+
+ public long getFileLength() {
+ synchronized(infoLock) {
+ return locatedBlocks == null? 0:
+ locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
+ }
+ }
+
+ // Short circuit local reads are forbidden for files that are
+ // under construction. See HDFS-2757.
+ boolean shortCircuitForbidden() {
+ synchronized(infoLock) {
+ return locatedBlocks.isUnderConstruction();
+ }
+ }
+
+ /**
+ * Returns the datanode from which the stream is currently reading.
+ */
+ public synchronized DatanodeInfo getCurrentDatanode() {
+ return currentNode;
+ }
+
+ /**
+ * Returns the block containing the target position.
+ */
+ synchronized public ExtendedBlock getCurrentBlock() {
+ if (currentLocatedBlock == null){
+ return null;
+ }
+ return currentLocatedBlock.getBlock();
+ }
+
+ /**
+ * Return collection of blocks that has already been located.
+ */
+ public List<LocatedBlock> getAllBlocks() throws IOException {
+ return getBlockRange(0, getFileLength());
+ }
+
+ /**
+ * Get block at the specified position.
+ * Fetch it from the namenode if not cached.
+ *
+ * @param offset block corresponding to this offset in file is returned
+ * @return located block
+ * @throws IOException
+ */
+ protected LocatedBlock getBlockAt(long offset) throws IOException {
+ synchronized(infoLock) {
+ assert (locatedBlocks != null) : "locatedBlocks is null";
+
+ final LocatedBlock blk;
+
+ //check offset
+ if (offset < 0 || offset >= getFileLength()) {
+ throw new IOException("offset < 0 || offset >= getFileLength(), offset="
+ + offset
+ + ", locatedBlocks=" + locatedBlocks);
+ }
+ else if (offset >= locatedBlocks.getFileLength()) {
+ // offset to the portion of the last block,
+ // which is not known to the name-node yet;
+ // getting the last block
+ blk = locatedBlocks.getLastLocatedBlock();
+ }
+ else {
+ // search cached blocks first
+ int targetBlockIdx = locatedBlocks.findBlock(offset);
+ if (targetBlockIdx < 0) { // block is not cached
+ targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+ // fetch more blocks
+ final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
+ assert (newBlocks != null) : "Could not find target position " + offset;
+ locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+ }
+ blk = locatedBlocks.get(targetBlockIdx);
+ }
+ return blk;
+ }
+ }
+
+ /** Fetch a block from namenode and cache it */
+ protected void fetchBlockAt(long offset) throws IOException {
+ synchronized(infoLock) {
+ int targetBlockIdx = locatedBlocks.findBlock(offset);
+ if (targetBlockIdx < 0) { // block is not cached
+ targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+ }
+ // fetch blocks
+ final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
+ if (newBlocks == null) {
+ throw new IOException("Could not find target position " + offset);
+ }
+ locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+ }
+ }
+
+ /**
+ * Get blocks in the specified range.
+ * Fetch them from the namenode if not cached. This function
+ * will not get a read request beyond the EOF.
+ * @param offset starting offset in file
+ * @param length length of data
+ * @return consequent segment of located blocks
+ * @throws IOException
+ */
+ private List<LocatedBlock> getBlockRange(long offset,
+ long length) throws IOException {
+ // getFileLength(): returns total file length
+ // locatedBlocks.getFileLength(): returns length of completed blocks
+ if (offset >= getFileLength()) {
+ throw new IOException("Offset: " + offset +
+ " exceeds file length: " + getFileLength());
+ }
+ synchronized(infoLock) {
+ final List<LocatedBlock> blocks;
+ final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
+ final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
+ final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
+
+ if (readOffsetWithinCompleteBlk) {
+ //get the blocks of finalized (completed) block range
+ blocks = getFinalizedBlockRange(offset,
+ Math.min(length, lengthOfCompleteBlk - offset));
+ } else {
+ blocks = new ArrayList<LocatedBlock>(1);
+ }
+
+ // get the blocks from incomplete block range
+ if (readLengthPastCompleteBlk) {
+ blocks.add(locatedBlocks.getLastLocatedBlock());
+ }
+
+ return blocks;
+ }
+ }
+
+ /**
+ * Get blocks in the specified range.
+ * Includes only the complete blocks.
+ * Fetch them from the namenode if not cached.
+ */
+ private List<LocatedBlock> getFinalizedBlockRange(
+ long offset, long length) throws IOException {
+ synchronized(infoLock) {
+ assert (locatedBlocks != null) : "locatedBlocks is null";
+ List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
+ // search cached blocks first
+ int blockIdx = locatedBlocks.findBlock(offset);
+ if (blockIdx < 0) { // block is not cached
+ blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
+ }
+ long remaining = length;
+ long curOff = offset;
+ while(remaining > 0) {
+ LocatedBlock blk = null;
+ if(blockIdx < locatedBlocks.locatedBlockCount())
+ blk = locatedBlocks.get(blockIdx);
+ if (blk == null || curOff < blk.getStartOffset()) {
+ LocatedBlocks newBlocks;
+ newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
+ locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
+ continue;
+ }
+ assert curOff >= blk.getStartOffset() : "Block not found";
+ blockRange.add(blk);
+ long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
+ remaining -= bytesRead;
+ curOff += bytesRead;
+ blockIdx++;
+ }
+ return blockRange;
+ }
+ }
+
+ /**
+ * Open a DataInputStream to a DataNode so that it can be read from.
+ * We get block ID and the IDs of the destinations at startup, from the namenode.
+ */
+ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
+ if (target >= getFileLength()) {
+ throw new IOException("Attempted to read past end of file");
+ }
+
+ // Will be getting a new BlockReader.
+ closeCurrentBlockReaders();
+
+ //
+ // Connect to best DataNode for desired Block, with potential offset
+ //
+ DatanodeInfo chosenNode = null;
+ int refetchToken = 1; // only need to get a new access token once
+ int refetchEncryptionKey = 1; // only need to get a new encryption key once
+
+ boolean connectFailedOnce = false;
+
+ while (true) {
+ //
+ // Compute desired block
+ //
+ LocatedBlock targetBlock = getBlockAt(target);
+
+ // update current position
+ this.pos = target;
+ this.blockEnd = targetBlock.getStartOffset() +
+ targetBlock.getBlockSize() - 1;
+ this.currentLocatedBlock = targetBlock;
+
+ long offsetIntoBlock = target - targetBlock.getStartOffset();
+
+ DNAddrPair retval = chooseDataNode(targetBlock, null);
+ chosenNode = retval.info;
+ InetSocketAddress targetAddr = retval.addr;
+ StorageType storageType = retval.storageType;
+
+ try {
+ blockReader = getBlockReader(targetBlock, offsetIntoBlock,
+ targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
+ storageType, chosenNode);
+ if(connectFailedOnce) {
+ DFSClient.LOG.info("Successfully connected to " + targetAddr +
+ " for " + targetBlock.getBlock());
+ }
+ return chosenNode;
+ } catch (IOException ex) {
+ if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to " + targetAddr
+ + " : " + ex);
+ // The encryption key used is invalid.
+ refetchEncryptionKey--;
+ dfsClient.clearDataEncryptionKey();
+ } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
+ refetchToken--;
+ fetchBlockAt(target);
+ } else {
+ connectFailedOnce = true;
+ DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ + ", add to deadNodes and continue. " + ex, ex);
+ // Put chosen node into dead list, continue
+ addToDeadNodes(chosenNode);
+ }
+ }
+ }
+ }
+
+ protected BlockReader getBlockReader(LocatedBlock targetBlock,
+ long offsetInBlock, long length, InetSocketAddress targetAddr,
+ StorageType storageType, DatanodeInfo datanode) throws IOException {
+ ExtendedBlock blk = targetBlock.getBlock();
+ Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+ CachingStrategy curCachingStrategy;
+ boolean shortCircuitForbidden;
+ synchronized (infoLock) {
+ curCachingStrategy = cachingStrategy;
+ shortCircuitForbidden = shortCircuitForbidden();
+ }
+ return new BlockReaderFactory(dfsClient.getConf()).
+ setInetSocketAddress(targetAddr).
+ setRemotePeerFactory(dfsClient).
+ setDatanodeInfo(datanode).
+ setStorageType(storageType).
+ setFileName(src).
+ setBlock(blk).
+ setBlockToken(accessToken).
+ setStartOffset(offsetInBlock).
+ setVerifyChecksum(verifyChecksum).
+ setClientName(dfsClient.clientName).
+ setLength(length).
+ setCachingStrategy(curCachingStrategy).
+ setAllowShortCircuitLocalReads(!shortCircuitForbidden).
+ setClientCacheContext(dfsClient.getClientContext()).
+ setUserGroupInformation(dfsClient.ugi).
+ setConfiguration(dfsClient.getConfiguration()).
+ build();
+ }
+
+ /**
+ * Close it down!
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ if (!closed.compareAndSet(false, true)) {
+ DFSClient.LOG.debug("DFSInputStream has been closed already");
+ return;
+ }
+ dfsClient.checkOpen();
+
+ if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
+ final StringBuilder builder = new StringBuilder();
+ extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
+ private String prefix = "";
+ @Override
+ public void accept(ByteBuffer k, Object v) {
+ builder.append(prefix).append(k);
+ prefix = ", ";
+ }
+ });
+ DFSClient.LOG.warn("closing file " + src + ", but there are still " +
+ "unreleased ByteBuffers allocated by read(). " +
+ "Please release " + builder.toString() + ".");
+ }
+ closeCurrentBlockReaders();
+ super.close();
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ if (oneByteBuf == null) {
+ oneByteBuf = new byte[1];
+ }
+ int ret = read( oneByteBuf, 0, 1 );
+ return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
+ }
+
+ /**
+ * Wraps different possible read implementations so that readBuffer can be
+ * strategy-agnostic.
+ */
+ interface ReaderStrategy {
+ public int doRead(BlockReader blockReader, int off, int len)
+ throws ChecksumException, IOException;
+
+ /**
+ * Copy data from the src ByteBuffer into the read buffer.
+ * @param src The src buffer where the data is copied from
+ * @param offset Useful only when the ReadStrategy is based on a byte array.
+ * Indicate the offset of the byte array for copy.
+ * @param length Useful only when the ReadStrategy is based on a byte array.
+ * Indicate the length of the data to copy.
+ */
+ public int copyFrom(ByteBuffer src, int offset, int length);
+ }
+
+ protected void updateReadStatistics(ReadStatistics readStatistics,
+ int nRead, BlockReader blockReader) {
+ if (nRead <= 0) return;
+ synchronized(infoLock) {
+ if (blockReader.isShortCircuit()) {
+ readStatistics.addShortCircuitBytes(nRead);
+ } else if (blockReader.isLocal()) {
+ readStatistics.addLocalBytes(nRead);
+ } else {
+ readStatistics.addRemoteBytes(nRead);
+ }
+ }
+ }
+
+ /**
+ * Used to read bytes into a byte[]
+ */
+ private class ByteArrayStrategy implements ReaderStrategy {
+ final byte[] buf;
+
+ public ByteArrayStrategy(byte[] buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int doRead(BlockReader blockReader, int off, int len)
+ throws ChecksumException, IOException {
+ int nRead = blockReader.read(buf, off, len);
+ updateReadStatistics(readStatistics, nRead, blockReader);
+ return nRead;
+ }
+
+ @Override
+ public int copyFrom(ByteBuffer src, int offset, int length) {
+ ByteBuffer writeSlice = src.duplicate();
+ writeSlice.get(buf, offset, length);
+ return length;
+ }
+ }
+
+ /**
+ * Used to read bytes into a user-supplied ByteBuffer
+ */
+ protected class ByteBufferStrategy implements ReaderStrategy {
+ final ByteBuffer buf;
+ ByteBufferStrategy(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public int doRead(BlockReader blockReader, int off, int len)
+ throws ChecksumException, IOException {
+ int oldpos = buf.position();
+ int oldlimit = buf.limit();
+ boolean success = false;
+ try {
+ int ret = blockReader.read(buf);
+ success = true;
+ updateReadStatistics(readStatistics, ret, blockReader);
+ if (ret == 0) {
+ DFSClient.LOG.warn("zero");
+ }
+ return ret;
+ } finally {
+ if (!success) {
+ // Reset to original state so that retries work correctly.
+ buf.position(oldpos);
+ buf.limit(oldlimit);
+ }
+ }
+ }
+
+ @Override
+ public int copyFrom(ByteBuffer src, int offset, int length) {
+ ByteBuffer writeSlice = src.duplicate();
+ int remaining = Math.min(buf.remaining(), writeSlice.remaining());
+ writeSlice.limit(writeSlice.position() + remaining);
+ buf.put(writeSlice);
+ return remaining;
+ }
+ }
+
+ /* This is a used by regular read() and handles ChecksumExceptions.
+ * name readBuffer() is chosen to imply similarity to readBuffer() in
+ * ChecksumFileSystem
+ */
+ private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
+ IOException ioe;
+
+ /* we retry current node only once. So this is set to true only here.
+ * Intention is to handle one common case of an error that is not a
+ * failure on datanode or client : when DataNode closes the connection
+ * since client is idle. If there are other cases of "non-errors" then
+ * then a datanode might be retried by setting this to true again.
+ */
+ boolean retryCurrentNode = true;
+
+ while (true) {
+ // retry as many times as seekToNewSource allows.
+ try {
+ return reader.doRead(blockReader, off, len);
+ } catch ( ChecksumException ce ) {
+ DFSClient.LOG.warn("Found Checksum error for "
+ + getCurrentBlock() + " from " + currentNode
+ + " at " + ce.getPos());
+ ioe = ce;
+ retryCurrentNode = false;
+ // we want to remember which block replicas we have tried
+ addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
+ corruptedBlockMap);
+ } catch ( IOException e ) {
+ if (!retryCurrentNode) {
+ DFSClient.LOG.warn("Exception while reading from "
+ + getCurrentBlock() + " of " + src + " from "
+ + currentNode, e);
+ }
+ ioe = e;
+ }
+ boolean sourceFound = false;
+ if (retryCurrentNode) {
+ /* possibly retry the same node so that transient errors don't
+ * result in application level failures (e.g. Datanode could have
+ * closed the connection because the client is idle for too long).
+ */
+ sourceFound = seekToBlockSource(pos);
+ } else {
+ addToDeadNodes(currentNode);
+ sourceFound = seekToNewSource(pos);
+ }
+ if (!sourceFound) {
+ throw ioe;
+ }
+ retryCurrentNode = false;
+ }
+ }
+
+ protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+ dfsClient.checkOpen();
+ if (closed.get()) {
+ throw new IOException("Stream closed");
+ }
+ Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
+ = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
+ failures = 0;
+ if (pos < getFileLength()) {
+ int retries = 2;
+ while (retries > 0) {
+ try {
+ // currentNode can be left as null if previous read had a checksum
+ // error on the same block. See HDFS-3067
+ if (pos > blockEnd || currentNode == null) {
+ currentNode = blockSeekTo(pos);
+ }
+ int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+ synchronized(infoLock) {
+ if (locatedBlocks.isLastBlockComplete()) {
+ realLen = (int) Math.min(realLen,
+ locatedBlocks.getFileLength() - pos);
+ }
+ }
+ int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
+
+ if (result >= 0) {
+ pos += result;
+ } else {
+ // got a EOS from reader though we expect more data on it.
+ throw new IOException("Unexpected EOS from the reader");
+ }
+ if (dfsClient.stats != null) {
+ dfsClient.stats.incrementBytesRead(result);
+ }
+ return result;
+ } catch (ChecksumException ce) {
+ throw ce;
+ } catch (IOException e) {
+ if (retries == 1) {
+ DFSClient.LOG.warn("DFS Read", e);
+ }
+ blockEnd = -1;
+ if (currentNode != null) { addToDeadNodes(currentNode); }
+ if (--retries == 0) {
+ throw e;
+ }
+ } finally {
+ // Check if need to report block replicas corruption either read
+ // was successful or ChecksumException occured.
+ reportCheckSumFailure(corruptedBlockMap,
+ currentLocatedBlock.getLocations().length);
+ }
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Read the entire buffer.
+ */
+ @Override
+ public synchronized int read(final byte buf[], int off, int len) throws IOException {
+ ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
+ TraceScope scope =
+ dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src);
+ try {
+ return readWithStrategy(byteArrayReader, off, len);
+ } finally {
+ scope.close();
+ }
+ }
+
+ @Override
+ public synchronized int read(final ByteBuffer buf) throws IOException {
+ ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
+ TraceScope scope =
+ dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
+ try {
+ return readWithStrategy(byteBufferReader, 0, buf.remaining());
+ } finally {
+ scope.close();
+ }
+ }
+
+
+ /**
+ * Add corrupted block replica into map.
+ */
+ protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+ Set<DatanodeInfo> dnSet = null;
+ if((corruptedBlockMap.containsKey(blk))) {
+ dnSet = corruptedBlockMap.get(blk);
+ }else {
+ dnSet = new HashSet<DatanodeInfo>();
+ }
+ if (!dnSet.contains(node)) {
+ dnSet.add(node);
+ corruptedBlockMap.put(blk, dnSet);
+ }
+ }
+
+ private DNAddrPair chooseDataNode(LocatedBlock block,
+ Collection<DatanodeInfo> ignoredNodes) throws IOException {
+ while (true) {
+ DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
+ if (result != null) {
+ return result;
+ } else {
+ String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
+ deadNodes, ignoredNodes);
+ String blockInfo = block.getBlock() + " file=" + src;
+ if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
+ String description = "Could not obtain block: " + blockInfo;
+ DFSClient.LOG.warn(description + errMsg
+ + ". Throwing a BlockMissingException");
+ throw new BlockMissingException(src, description,
+ block.getStartOffset());
+ }
+
+ DatanodeInfo[] nodes = block.getLocations();
+ if (nodes == null || nodes.length == 0) {
+ DFSClient.LOG.info("No node available for " + blockInfo);
+ }
+ DFSClient.LOG.info("Could not obtain " + block.getBlock()
+ + " from any node: " + errMsg
+ + ". Will get new block locations from namenode and retry...");
+ try {
+ // Introducing a random factor to the wait time before another retry.
+ // The wait time is dependent on # of failures and a random factor.
+ // At the first time of getting a BlockMissingException, the wait time
+ // is a random number between 0..3000 ms. If the first retry
+ // still fails, we will wait 3000 ms grace period before the 2nd retry.
+ // Also at the second retry, the waiting window is expanded to 6000 ms
+ // alleviating the request rate from the server. Similarly the 3rd retry
+ // will wait 6000ms grace period before retry and the waiting window is
+ // expanded to 9000ms.
+ final int timeWindow = dfsClient.getConf().getTimeWindow();
+ double waitTime = timeWindow * failures + // grace period for the last round of attempt
+ // expanding time window for each failure
+ timeWindow * (failures + 1) *
+ ThreadLocalRandom.current().nextDouble();
+ DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
+ Thread.sleep((long)waitTime);
+ } catch (InterruptedException iex) {
+ }
+ deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+ openInfo(true);
+ block = refreshLocatedBlock(block);
+ failures++;
+ }
+ }
+ }
+
+ /**
+ * Get the best node from which to stream the data.
+ * @param block LocatedBlock, containing nodes in priority order.
+ * @param ignoredNodes Do not choose nodes in this array (may be null)
+ * @return The DNAddrPair of the best node. Null if no node can be chosen.
+ */
+ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
+ Collection<DatanodeInfo> ignoredNodes) {
+ DatanodeInfo[] nodes = block.getLocations();
+ StorageType[] storageTypes = block.getStorageTypes();
+ DatanodeInfo chosenNode = null;
+ StorageType storageType = null;
+ if (nodes != null) {
+ for (int i = 0; i < nodes.length; i++) {
+ if (!deadNodes.containsKey(nodes[i])
+ && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
+ chosenNode = nodes[i];
+ // Storage types are ordered to correspond with nodes, so use the same
+ // index to get storage type.
+ if (storageTypes != null && i < storageTypes.length) {
+ storageType = storageTypes[i];
+ }
+ break;
+ }
+ }
+ }
+ if (chosenNode == null) {
+ DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() +
+ " after checking nodes = " + Arrays.toString(nodes) +
+ ", ignoredNodes = " + ignoredNodes);
+ return null;
+ }
+ final String dnAddr =
+ chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
+ }
+ InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
+ return new DNAddrPair(chosenNode, targetAddr, storageType);
+ }
+
+ private static String getBestNodeDNAddrPairErrorString(
+ DatanodeInfo nodes[], AbstractMap<DatanodeInfo,
+ DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) {
+ StringBuilder errMsgr = new StringBuilder(
+ " No live nodes contain current block ");
+ errMsgr.append("Block locations:");
+ for (DatanodeInfo datanode : nodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ errMsgr.append(" Dead nodes: ");
+ for (DatanodeInfo datanode : deadNodes.keySet()) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ if (ignoredNodes != null) {
+ errMsgr.append(" Ignored nodes: ");
+ for (DatanodeInfo datanode : ignoredNodes) {
+ errMsgr.append(" ");
+ errMsgr.append(datanode.toString());
+ }
+ }
+ return errMsgr.toString();
+ }
+
+ protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
+ byte[] buf, int offset,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
+ block = refreshLocatedBlock(block);
+ while (true) {
+ DNAddrPair addressPair = chooseDataNode(block, null);
+ try {
+ actualGetFromOneDataNode(addressPair, block, start, end,
+ buf, offset, corruptedBlockMap);
+ return;
+ } catch (IOException e) {
+ // Ignore. Already processed inside the function.
+ // Loop through to try the next node.
+ }
+ }
+ }
+
+ private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
+ final LocatedBlock block, final long start, final long end,
+ final ByteBuffer bb,
+ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+ final int hedgedReadId) {
+ final Span parentSpan = Trace.currentSpan();
+ return new Callable<ByteBuffer>() {
+ @Override
+ public ByteBuffer call() throws Exception {
+ byte[] buf = bb.array();
+ int offset = bb.position();
+ TraceScope scope =
+ Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
+ try {
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
+ offset, corruptedBlockMap);
+ return bb;
+ } finally {
+ scope.close();
+ }
+ }
+ };
+ }
+
+ /**
+ * Used when reading contiguous blocks
+ */
+ private void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long start, final long end, byte[] buf,
+ int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
+ final int length = (int) (end - start + 1);
+ actualGetFromOneDataNode(datanode, block, start, end, buf,
+ new int[]{offset}, new int[]{length}, corruptedBlockMap);
+ }
+
+ /**
+ * Read data from one DataNode.
+ * @param datanode the datanode from which to read data
+ * @param block the located block containing the requested data
+ * @param startInBlk the startInBlk offset of the block
+ * @param endInBlk the endInBlk offset of the block
+ * @param buf the given byte array into which the data is read
+ * @param offsets the data may be read into multiple segments of the buf
+ * (when reading a striped block). this array indicates the
+ * offset of each buf segment.
+ * @param lengths the length of each buf segment
+ * @param corruptedBlockMap map recording list of datanodes with corrupted
+ * block replica
+ */
+ void actualGetFromOneDataNode(final DNAddrPair datanode,
+ LocatedBlock block, final long startInBlk, final long endInBlk,
+ byte[] buf, int[] offsets, int[] lengths,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
+ DFSClientFaultInjector.get().startFetchFromDatanode();
+ int refetchToken = 1; // only need to get a new access token once
+ int refetchEncryptionKey = 1; // only need to get a new encryption key once
+ final int len = (int) (endInBlk - startInBlk + 1);
+ checkReadPortions(offsets, lengths, len);
+
+ while (true) {
+ // cached block locations may have been updated by chooseDataNode()
+ // or fetchBlockAt(). Always get the latest list of locations at the
+ // start of the loop.
+ block = refreshLocatedBlock(block);
+ BlockReader reader = null;
+ try {
+ DFSClientFaultInjector.get().fetchFromDatanodeException();
+ reader = getBlockReader(block, startInBlk, len, datanode.addr,
+ datanode.storageType, datanode.info);
+ for (int i = 0; i < offsets.length; i++) {
+ int nread = reader.readAll(buf, offsets[i], lengths[i]);
+ updateReadStatistics(readStatistics, nread, reader);
+ if (nread != lengths[i]) {
+ throw new IOException("truncated return from reader.read(): " +
+ "excpected " + lengths[i] + ", got " + nread);
+ }
+ }
+ DFSClientFaultInjector.get().readFromDatanodeDelay();
+ return;
+ } catch (ChecksumException e) {
+ String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ + datanode.info;
+ DFSClient.LOG.warn(msg);
+ // we want to remember what we have tried
+ addIntoCorruptedBlockMap(block.getBlock(), datanode.info,
+ corruptedBlockMap);
+ addToDeadNodes(datanode.info);
+ throw new IOException(msg);
+ } catch (IOException e) {
+ if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to " + datanode.addr
+ + " : " + e);
+ // The encryption key used is invalid.
+ refetchEncryptionKey--;
+ dfsClient.clearDataEncryptionKey();
+ } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) {
+ refetchToken--;
+ try {
+ fetchBlockAt(block.getStartOffset());
+ } catch (IOException fbae) {
+ // ignore IOE, since we can retry it later in a loop
+ }
+ } else {
+ String msg = "Failed to connect to " + datanode.addr + " for file "
+ + src + " for block " + block.getBlock() + ":" + e;
+ DFSClient.LOG.warn("Connection failure: " + msg, e);
+ addToDeadNodes(datanode.info);
+ throw new IOException(msg);
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Refresh cached block locations.
+ * @param block The currently cached block locations
+ * @return Refreshed block locations
+ * @throws IOException
+ */
+ protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+ throws IOException {
+ return getBlockAt(block.getStartOffset());
+ }
+
+ /**
+ * This method verifies that the read portions are valid and do not overlap
+ * with each other.
+ */
+ private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+ Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
+ int sum = 0;
+ for (int i = 0; i < lengths.length; i++) {
+ if (i > 0) {
+ int gap = offsets[i] - offsets[i - 1];
+ // make sure read portions do not overlap with each other
+ Preconditions.checkArgument(gap >= lengths[i - 1]);
+ }
+ sum += lengths[i];
+ }
+ Preconditions.checkArgument(sum == totalLen);
+ }
+
+ /**
+ * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
+ * 'hedged' read if the first read is taking longer than configured amount of
+ * time. We then wait on which ever read returns first.
+ */
+ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
+ long end, byte[] buf, int offset,
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+ throws IOException {
+ final DfsClientConf conf = dfsClient.getConf();
+ ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
+ CompletionService<ByteBuffer> hedgedService =
+ new ExecutorCompletionService<ByteBuffer>(
+ dfsClient.getHedgedReadsThreadPool());
+ ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
+ ByteBuffer bb = null;
+ int len = (int) (end - start + 1);
+ int hedgedReadId = 0;
+ block = refreshLocatedBlock(block);
+ while (true) {
+ // see HDFS-6591, this metric is used to verify/catch unnecessary loops
+ hedgedReadOpsLoopNumForTesting++;
+ DNAddrPair chosenNode = null;
+ // there is no request already executing.
+ if (futures.isEmpty()) {
+ // chooseDataNode is a commitment. If no node, we go to
+ // the NN to reget block locations. Only go here on first read.
+ chosenNode = chooseDataNode(block, ignored);
+ bb = ByteBuffer.wrap(buf, offset, len);
+ Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+ chosenNode, block, start, end, bb,
+ corruptedBlockMap, hedgedReadId++);
+ Future<ByteBuffer> firstRequest = hedgedService
+ .submit(getFromDataNodeCallable);
+ futures.add(firstRequest);
+ try {
+ Future<ByteBuffer> future = hedgedService.poll(
+ conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
+ if (future != null) {
+ future.get();
+ return;
+ }
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Waited " + conf.getHedgedReadThresholdMillis()
+ + "ms to read from " + chosenNode.info
+ + "; spawning hedged read");
+ }
+ // Ignore this node on next go around.
+ ignored.add(chosenNode.info);
+ dfsClient.getHedgedReadMetrics().incHedgedReadOps();
+ continue; // no need to refresh block locations
+ } catch (InterruptedException e) {
+ // Ignore
+ } catch (ExecutionException e) {
+ // Ignore already logged in the call.
+ }
+ } else {
+ // We are starting up a 'hedged' read. We have a read already
+ // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
+ // If no nodes to do hedged reads against, pass.
+ try {
+ chosenNode = getBestNodeDNAddrPair(block, ignored);
+ if (chosenNode == null) {
+ chosenNode = chooseDataNode(block, ignored);
+ }
+ bb = ByteBuffer.allocate(len);
+ Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
+ chosenNode, block, start, end, bb,
+ corruptedBlockMap, hedgedReadId++);
+ Future<ByteBuffer> oneMoreRequest = hedgedService
+ .submit(getFromDataNodeCallable);
+ futures.add(oneMoreRequest);
+ } catch (IOException ioe) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Failed getting node for hedged read: "
+ + ioe.getMessage());
+ }
+ }
+ // if not succeeded. Submit callables for each datanode in a loop, wait
+ // for a fixed interval and get the result from the fastest one.
+ try {
+ ByteBuffer result = getFirstToComplete(hedgedService, futures);
+ // cancel the rest.
+ cancelAll(futures);
+ if (result.array() != buf) { // compare the array pointers
+ dfsClient.getHedgedReadMetrics().incHedgedReadWins();
+ System.arraycopy(result.array(), result.position(), buf, offset,
+ len);
+ } else {
+ dfsClient.getHedgedReadMetrics().incHedgedReadOps();
+ }
+ return;
+ } catch (InterruptedException ie) {
+ // Ignore and retry
+ }
+ // We got here if exception. Ignore this node on next go around IFF
+ // we found a chosenNode to hedge read against.
+ if (chosenNode != null && chosenNode.info != null) {
+ ignored.add(chosenNode.info);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public long getHedgedReadOpsLoopNumForTesting() {
+ return hedgedReadOpsLoopNumForTesting;
+ }
+
+ private ByteBuffer getFirstToComplete(
+ CompletionService<ByteBuffer> hedgedService,
+ ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
+ if (futures.isEmpty()) {
+ throw new InterruptedException("let's retry");
+ }
+ Future<ByteBuffer> future = null;
+ try {
+ future = hedgedService.take();
+ ByteBuffer bb = future.get();
+ futures.remove(future);
+ return bb;
+ } catch (ExecutionException e) {
+ // already logged in the Callable
+ futures.remove(future);
+ } catch (CancellationException ce) {
+ // already logged in the Callable
+ futures.remove(future);
+ }
+
+ throw new InterruptedException("let's retry");
+ }
+
+ private void cancelAll(List<Future<ByteBuffer>> futures) {
+ for (Future<ByteBuffer> future : futures) {
+ // Unfortunately, hdfs reads do not take kindly to interruption.
+ // Threads return a variety of interrupted-type exceptions but
+ // also complaints about invalid pbs -- likely because read
+ // is interrupted before gets whole pb. Also verbose WARN
+ // logging. So, for now, do not interrupt running read.
+ future.cancel(false);
+ }
+ }
+
+ /**
+ * Should the block access token be refetched on an exception
+ *
+ * @param ex Exception received
+ * @param targetAddr Target datanode address from where exception was received
+ * @return true if block access token has expired or invalid and it should be
+ * refetched
+ */
+ protected static boolean tokenRefetchNeeded(IOException ex,
+ InetSocketAddress targetAddr) {
+ /*
+ * Get a new access token and retry. Retry is needed in 2 cases. 1)
+ * When both NN and DN re-started while DFSClient holding a cached
+ * access token. 2) In the case that NN fails to update its
+ * access key at pre-set interval (by a wide margin) and
+ * subsequently restarts. In this case, DN re-registers itself with
+ * NN and receives a new access key, but DN will delete the old
+ * access key from its memory since it's considered expired based on
+ * the estimated expiration date.
+ */
+ if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+ DFSClient.LOG.info("Access token was invalid when connecting to "
+ + targetAddr + " : " + ex);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Read bytes starting from the specified position.
+ *
+ * @param position start read from this position
+ * @param buffer read buffer
+ * @param offset offset into buffer
+ * @param length number of bytes to read
+ *
+ * @return actual number of bytes read
+ */
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ TraceScope scope =
+ dfsClient.getPathTraceScope("DFSInputStream#byteArrayPread", src);
+ try {
+ return pread(position, buffer, offset, length);
+ } finally {
+ scope.close();
+ }
+ }
+
+ private int pread(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ // sanity checks
+ dfsClient.checkOpen();
+ if (closed.get()) {
+ throw new IOException("Stream closed");
+ }
+ failures = 0;
+ long filelen = getFileLength();
+ if ((position < 0) || (position >= filelen)) {
+ return -1;
+ }
+ int realLen = length;
+ if ((position + length) > filelen) {
+ realLen = (int)(filelen - position);
+ }
+
+ // determine the block and byte range within the block
+ // corresponding to position and realLen
+ List<LocatedBlock> blockRange = getBlockRange(position, realLen);
+ int remaining = realLen;
+ Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
+ = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
+ for (LocatedBlock blk : blockRange) {
+ long targetStart = position - blk.getStartOffset();
+ long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
+ try {
+ if (dfsClient.isHedgedReadsEnabled()) {
+ hedgedFetchBlockByteRange(blk, targetStart,
+ targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+ } else {
+ fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+ buffer, offset, corruptedBlockMap);
+ }
+ } finally {
+ // Check and report if any block replicas are corrupted.
+ // BlockMissingException may be caught if all block replicas are
+ // corrupted.
+ reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
+ }
+
+ remaining -= bytesToRead;
+ position += bytesToRead;
+ offset += bytesToRead;
+ }
+ assert remaining == 0 : "Wrong number of bytes read.";
+ if (dfsClient.stats != null) {
+ dfsClient.stats.incrementBytesRead(realLen);
+ }
+ return realLen;
+ }
+
+ /**
+ * DFSInputStream reports checksum failure.
+ * Case I : client has tried multiple data nodes and at least one of the
+ * attempts has succeeded. We report the other failures as corrupted block to
+ * namenode.
+ * Case II: client has tried out all data nodes, but all failed. We
+ * only report if the total number of replica is 1. We do not
+ * report otherwise since this maybe due to the client is a handicapped client
+ * (who can not read).
+ * @param corruptedBlockMap map of corrupted blocks
+ * @param dataNodeCount number of data nodes who contains the block replicas
+ */
+ protected void reportCheckSumFailure(
+ Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+ int dataNodeCount) {
+ if (corruptedBlockMap.isEmpty()) {
+ return;
+ }
+ Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
+ .entrySet().iterator();
+ Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
+ ExtendedBlock blk = entry.getKey();
+ Set<DatanodeInfo> dnSet = entry.getValue();
+ if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
+ || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
+ DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
+ int i = 0;
+ for (DatanodeInfo dn:dnSet) {
+ locs[i++] = dn;
+ }
+ LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
+ dfsClient.reportChecksumFailure(src, lblocks);
+ }
+ corruptedBlockMap.clear();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if ( n > 0 ) {
+ long curPos = getPos();
+ long fileLen = getFileLength();
+ if( n+curPos > fileLen ) {
+ n = fileLen - curPos;
+ }
+ seek(curPos+n);
+ return n;
+ }
+ return n < 0 ? -1 : 0;
+ }
+
+ /**
+ * Seek to a new arbitrary location
+ */
+ @Override
+ public synchronized void seek(long targetPos) throws IOException {
+ if (targetPos > getFileLength()) {
+ throw new EOFException("Cannot seek after EOF");
+ }
+ if (targetPos < 0) {
+ throw new EOFException("Cannot seek to negative offset");
+ }
+ if (closed.get()) {
+ throw new IOException("Stream is closed!");
+ }
+ boolean done = false;
+ if (pos <= targetPos && targetPos <= blockEnd) {
+ //
+ // If this seek is to a positive position in the current
+ // block, and this piece of data might already be lying in
+ // the TCP buffer, then just eat up the intervening data.
+ //
+ int diff = (int)(targetPos - pos);
+ if (diff <= blockReader.available()) {
+ try {
+ pos += blockReader.skip(diff);
+ if (pos == targetPos) {
+ done = true;
+ } else {
+ // The range was already checked. If the block reader returns
+ // something unexpected instead of throwing an exception, it is
+ // most likely a bug.
+ String errMsg = "BlockReader failed to seek to " +
+ targetPos + ". Instead, it seeked to " + pos + ".";
+ DFSClient.LOG.warn(errMsg);
+ throw new IOException(errMsg);
+ }
+ } catch (IOException e) {//make following read to retry
+ if(DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Exception while seek to " + targetPos
+ + " from " + getCurrentBlock() + " of " + src + " from "
+ + currentNode, e);
+ }
+ }
+ }
+ }
+ if (!done) {
+ pos = targetPos;
+ blockEnd = -1;
+ }
+ }
+
+ /**
+ * Same as {@link #seekToNewSource(long)} except that it does not exclude
+ * the current datanode and might connect to the same node.
+ */
+ private boolean seekToBlockSource(long targetPos)
+ throws IOException {
+ currentNode = blockSeekTo(targetPos);
+ return true;
+ }
+
+ /**
+ * Seek to given position on a node other than the current node. If
+ * a node other than the current node is found, then returns true.
+ * If another node could not be found, then returns false.
+ */
+ @Override
+ public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+ if (currentNode == null) {
+ return seekToBlockSource(targetPos);
+ }
+ boolean markedDead = deadNodes.containsKey(currentNode);
+ addToDeadNodes(currentNode);
+ DatanodeInfo oldNode = currentNode;
+ DatanodeInfo newNode = blockSeekTo(targetPos);
+ if (!markedDead) {
+ /* remove it from deadNodes. blockSeekTo could have cleared
+ * deadNodes and added currentNode again. Thats ok. */
+ deadNodes.remove(oldNode);
+ }
+ if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
+ currentNode = newNode;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ */
+ @Override
+ public synchronized long getPos() {
+ return pos;
+ }
+
+ /** Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ if (closed.get()) {
+ throw new IOException("Stream closed");
+ }
+
+ final long remaining = getFileLength() - pos;
+ return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE;
+ }
+
+ /**
+ * We definitely don't support marks
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+ @Override
+ public void mark(int readLimit) {
+ }
+ @Override
+ public void reset() throws IOException {
+ throw new IOException("Mark/reset not supported");
+ }
+
+ /** Utility class to encapsulate data node info and its address. */
+ static final class DNAddrPair {
+ final DatanodeInfo info;
+ final InetSocketAddress addr;
+ final StorageType storageType;
+
+ DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
+ StorageType storageType) {
+ this.info = info;
+ this.addr = addr;
+ this.storageType = storageType;
+ }
+ }
+
+ /**
+ * Get statistics about the reads which this DFSInputStream has done.
+ */
+ public ReadStatistics getReadStatistics() {
+ synchronized(infoLock) {
+ return new ReadStatistics(readStatistics);
+ }
+ }
+
+ /**
+ * Clear statistics about the reads which this DFSInputStream has done.
+ */
+ public void clearReadStatistics() {
+ synchronized(infoLock) {
+ readStatistics.clear();
+ }
+ }
+
+ public FileEncryptionInfo getFileEncryptionInfo() {
+ synchronized(infoLock) {
+ return fileEncryptionInfo;
+ }
+ }
+
+ protected void closeCurrentBlockReaders() {
+ if (blockReader == null) return;
+ // Close the current block reader so that the new caching settings can
+ // take effect immediately.
+ try {
+ blockReader.close();
+ } catch (IOException e) {
+ DFSClient.LOG.error("error closing blockReader", e);
+ }
+ blockReader = null;
+ blockEnd = -1;
+ }
+
+ @Override
+ public synchronized void setReadahead(Long readahead)
+ throws IOException {
+ synchronized (infoLock) {
+ this.cachingStrategy =
+ new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
+ }
+ closeCurrentBlockReaders();
+ }
+
+ @Override
+ public synchronized void setDropBehind(Boolean dropBehind)
+ throws IOException {
+ synchronized (infoLock) {
+ this.cachingStrategy =
+ new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
+ }
+ closeCurrentBlockReaders();
+ }
+
+ /**
+ * The immutable empty buffer we return when we reach EOF when doing a
+ * zero-copy read.
+ */
+ private static final ByteBuffer EMPTY_BUFFER =
+ ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
+
+ @Override
+ public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+ int maxLength, EnumSet<ReadOption> opts)
+ throws IOException, UnsupportedOperationException {
+ if (maxLength == 0) {
+ return EMPTY_BUFFER;
+ } else if (maxLength < 0) {
+ throw new IllegalArgumentException("can't read a negative " +
+ "number of bytes.");
+ }
+ if ((blockReader == null) || (blockEnd == -1)) {
+ if (pos >= getFileLength()) {
+ return null;
+ }
+ /*
+ * If we don't have a blockReader, or the one we have has no more bytes
+ * left to read, we call seekToBlockSource to get a new blockReader and
+ * recalculate blockEnd. Note that we assume we're not at EOF here
+ * (we check this above).
+ */
+ if ((!seekToBlockSource(pos)) || (blockReader == null)) {
+ throw new IOException("failed to allocate new BlockReader " +
+ "at position " + pos);
+ }
+ }
+ ByteBuffer buffer = null;
+ if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) {
+ buffer = tryReadZeroCopy(maxLength, opts);
+ }
+ if (buffer != null) {
+ return buffer;
+ }
+ buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
+ if (buffer != null) {
+ getExtendedReadBuffers().put(buffer, bufferPool);
+ }
+ return buffer;
+ }
+
+ private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
+ EnumSet<ReadOption> opts) throws IOException {
+ // Copy 'pos' and 'blockEnd' to local variables to make it easier for the
+ // JVM to optimize this function.
+ final long curPos = pos;
+ final long curEnd = blockEnd;
+ final long blockStartInFile = currentLocatedBlock.getStartOffset();
+ final long blockPos = curPos - blockStartInFile;
+
+ // Shorten this read if the end of the block is nearby.
+ long length63;
+ if ((curPos + maxLength) <= (curEnd + 1)) {
+ length63 = maxLength;
+ } else {
+ length63 = 1 + curEnd - curPos;
+ if (length63 <= 0) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
+ curPos + " of " + src + "; " + length63 + " bytes left in block. " +
+ "blockPos=" + blockPos + "; curPos=" + curPos +
+ "; curEnd=" + curEnd);
+ }
+ return null;
+ }
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Reducing read length from " + maxLength +
+ " to " + length63 + " to avoid going more than one byte " +
+ "past the end of the block. blockPos=" + blockPos +
+ "; curPos=" + curPos + "; curEnd=" + curEnd);
+ }
+ }
+ // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
+ int length;
+ if (blockPos + length63 <= Integer.MAX_VALUE) {
+ length = (int)length63;
+ } else {
+ long length31 = Integer.MAX_VALUE - blockPos;
+ if (length31 <= 0) {
+ // Java ByteBuffers can't be longer than 2 GB, because they use
+ // 4-byte signed integers to represent capacity, etc.
+ // So we can't mmap the parts of the block higher than the 2 GB offset.
+ // FIXME: we could work around this with multiple memory maps.
+ // See HDFS-5101.
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
+ curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
+ "exceeded. blockPos=" + blockPos + ", curEnd=" + curEnd);
+ }
+ return null;
+ }
+ length = (int)length31;
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("Reducing read length from " + maxLength +
+ " to " + length + " to avoid 31-bit limit. " +
+ "blockPos=" + blockPos + "; curPos=" + curPos +
+ "; curEnd=" + curEnd);
+ }
+ }
+ final ClientMmap clientMmap = blockReader.getClientMmap(opts);
+ if (clientMmap == null) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+ curPos + " of " + src + "; BlockReader#getClientMmap returned " +
+ "null.");
+ }
+ return null;
+ }
+ boolean success = false;
+ ByteBuffer buffer;
+ try {
+ seek(curPos + length);
+ buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+ buffer.position((int)blockPos);
+ buffer.limit((int)(blockPos + length));
+ getExtendedReadBuffers().put(buffer, clientMmap);
+ synchronized (infoLock) {
+ readStatistics.addZeroCopyBytes(length);
+ }
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("readZeroCopy read " + length +
+ " bytes from offset " + curPos + " via the zero-copy read " +
+ "path. blockEnd = " + blockEnd);
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ IOUtils.closeQuietly(clientMmap);
+ }
+ }
+ return buffer;
+ }
+
+ @Override
+ public synchronized void releaseBuffer(ByteBuffer buffer) {
+ if (buffer == EMPTY_BUFFER) return;
+ Object val = getExtendedReadBuffers().remove(buffer);
+ if (val == null) {
+ throw new IllegalArgumentException("tried to release a buffer " +
+ "that was not created by this stream, " + buffer);
+ }
+ if (val instanceof ClientMmap) {
+ IOUtils.closeQuietly((ClientMmap)val);
+ } else if (val instanceof ByteBufferPool) {
+ ((ByteBufferPool)val).putBuffer(buffer);
+ }
+ }
+
+ @Override
+ public synchronized void unbuffer() {
+ closeCurrentBlockReaders();
+ }
+}