You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/09/26 20:17:13 UTC
[04/12] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and
related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
deleted file mode 100644
index 5392c66..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * Used for injecting faults in DFSClient and DFSOutputStream tests.
- * Calls into this are a no-op in production code.
- */
-@VisibleForTesting
-@InterfaceAudience.Private
-public class DFSClientFaultInjector {
- public static DFSClientFaultInjector instance = new DFSClientFaultInjector();
- public static AtomicLong exceptionNum = new AtomicLong(0);
-
- public static DFSClientFaultInjector get() {
- return instance;
- }
-
- public boolean corruptPacket() {
- return false;
- }
-
- public boolean uncorruptPacket() {
- return false;
- }
-
- public boolean failPacket() {
- return false;
- }
-
- public void startFetchFromDatanode() {}
-
- public void fetchFromDatanodeException() {}
-
- public void readFromDatanodeDelay() {}
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c9f23f9..0ba174f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs;
-import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
deleted file mode 100644
index 2a228e8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSHedgedReadMetrics.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * The client-side metrics for hedged read feature.
- * This class has a number of metrics variables that are publicly accessible,
- * we can grab them from client side, like HBase.
- */
-@InterfaceAudience.Private
-public class DFSHedgedReadMetrics {
- public final AtomicLong hedgedReadOps = new AtomicLong();
- public final AtomicLong hedgedReadOpsWin = new AtomicLong();
- public final AtomicLong hedgedReadOpsInCurThread = new AtomicLong();
-
- public void incHedgedReadOps() {
- hedgedReadOps.incrementAndGet();
- }
-
- public void incHedgedReadOpsInCurThread() {
- hedgedReadOpsInCurThread.incrementAndGet();
- }
-
- public void incHedgedReadWins() {
- hedgedReadOpsWin.incrementAndGet();
- }
-
- public long getHedgedReadOps() {
- return hedgedReadOps.longValue();
- }
-
- public long getHedgedReadOpsInCurThread() {
- return hedgedReadOpsInCurThread.longValue();
- }
-
- public long getHedgedReadWins() {
- return hedgedReadOpsWin.longValue();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
deleted file mode 100644
index 1f9e3e9..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import com.google.common.collect.Iterators;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.inotify.EventBatch;
-import org.apache.hadoop.hdfs.inotify.EventBatchList;
-import org.apache.hadoop.hdfs.inotify.MissingEventsException;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.util.Time;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Stream for reading inotify events. DFSInotifyEventInputStreams should not
- * be shared among multiple threads.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class DFSInotifyEventInputStream {
- public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
- .class);
-
- /**
- * The trace sampler to use when making RPCs to the NameNode.
- */
- private final Sampler<?> traceSampler;
-
- private final ClientProtocol namenode;
- private Iterator<EventBatch> it;
- private long lastReadTxid;
- /**
- * The most recent txid the NameNode told us it has sync'ed -- helps us
- * determine how far behind we are in the edit stream.
- */
- private long syncTxid;
- /**
- * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}.
- */
- private Random rng = new Random();
-
- private static final int INITIAL_WAIT_MS = 10;
-
- DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode)
- throws IOException {
- // Only consider new transaction IDs.
- this(traceSampler, namenode, namenode.getCurrentEditLogTxid());
- }
-
- DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode,
- long lastReadTxid) throws IOException {
- this.traceSampler = traceSampler;
- this.namenode = namenode;
- this.it = Iterators.emptyIterator();
- this.lastReadTxid = lastReadTxid;
- }
-
- /**
- * Returns the next batch of events in the stream or null if no new
- * batches are currently available.
- *
- * @throws IOException because of network error or edit log
- * corruption. Also possible if JournalNodes are unresponsive in the
- * QJM setting (even one unresponsive JournalNode is enough in rare cases),
- * so catching this exception and retrying at least a few times is
- * recommended.
- * @throws MissingEventsException if we cannot return the next batch in the
- * stream because the data for the events (and possibly some subsequent
- * events) has been deleted (generally because this stream is a very large
- * number of transactions behind the current state of the NameNode). It is
- * safe to continue reading from the stream after this exception is thrown
- * The next available batch of events will be returned.
- */
- public EventBatch poll() throws IOException, MissingEventsException {
- TraceScope scope =
- Trace.startSpan("inotifyPoll", traceSampler);
- try {
- // need to keep retrying until the NN sends us the latest committed txid
- if (lastReadTxid == -1) {
- LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
- lastReadTxid = namenode.getCurrentEditLogTxid();
- return null;
- }
- if (!it.hasNext()) {
- EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
- if (el.getLastTxid() != -1) {
- // we only want to set syncTxid when we were actually able to read some
- // edits on the NN -- otherwise it will seem like edits are being
- // generated faster than we can read them when the problem is really
- // that we are temporarily unable to read edits
- syncTxid = el.getSyncTxid();
- it = el.getBatches().iterator();
- long formerLastReadTxid = lastReadTxid;
- lastReadTxid = el.getLastTxid();
- if (el.getFirstTxid() != formerLastReadTxid + 1) {
- throw new MissingEventsException(formerLastReadTxid + 1,
- el.getFirstTxid());
- }
- } else {
- LOG.debug("poll(): read no edits from the NN when requesting edits " +
- "after txid {}", lastReadTxid);
- return null;
- }
- }
-
- if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
- // newly seen edit log ops actually got converted to events
- return it.next();
- } else {
- return null;
- }
- } finally {
- scope.close();
- }
- }
-
- /**
- * Return a estimate of how many transaction IDs behind the NameNode's
- * current state this stream is. Clients should periodically call this method
- * and check if its result is steadily increasing, which indicates that they
- * are falling behind (i.e. transaction are being generated faster than the
- * client is reading them). If a client falls too far behind events may be
- * deleted before the client can read them.
- * <p/>
- * A return value of -1 indicates that an estimate could not be produced, and
- * should be ignored. The value returned by this method is really only useful
- * when compared to previous or subsequent returned values.
- */
- public long getTxidsBehindEstimate() {
- if (syncTxid == 0) {
- return -1;
- } else {
- assert syncTxid >= lastReadTxid;
- // this gives the difference between the last txid we have fetched to the
- // client and syncTxid at the time we last fetched events from the
- // NameNode
- return syncTxid - lastReadTxid;
- }
- }
-
- /**
- * Returns the next event batch in the stream, waiting up to the specified
- * amount of time for a new batch. Returns null if one is not available at the
- * end of the specified amount of time. The time before the method returns may
- * exceed the specified amount of time by up to the time required for an RPC
- * to the NameNode.
- *
- * @param time number of units of the given TimeUnit to wait
- * @param tu the desired TimeUnit
- * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
- * @throws MissingEventsException
- * see {@link DFSInotifyEventInputStream#poll()}
- * @throws InterruptedException if the calling thread is interrupted
- */
- public EventBatch poll(long time, TimeUnit tu) throws IOException,
- InterruptedException, MissingEventsException {
- TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
- EventBatch next = null;
- try {
- long initialTime = Time.monotonicNow();
- long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
- long nextWait = INITIAL_WAIT_MS;
- while ((next = poll()) == null) {
- long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
- if (timeLeft <= 0) {
- LOG.debug("timed poll(): timed out");
- break;
- } else if (timeLeft < nextWait * 2) {
- nextWait = timeLeft;
- } else {
- nextWait *= 2;
- }
- LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
- nextWait);
- Thread.sleep(nextWait);
- }
- } finally {
- scope.close();
- }
- return next;
- }
-
- /**
- * Returns the next batch of events in the stream, waiting indefinitely if
- * a new batch is not immediately available.
- *
- * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
- * @throws MissingEventsException see
- * {@link DFSInotifyEventInputStream#poll()}
- * @throws InterruptedException if the calling thread is interrupted
- */
- public EventBatch take() throws IOException, InterruptedException,
- MissingEventsException {
- TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
- EventBatch next = null;
- try {
- int nextWaitMin = INITIAL_WAIT_MS;
- while ((next = poll()) == null) {
- // sleep for a random period between nextWaitMin and nextWaitMin * 2
- // to avoid stampedes at the NN if there are multiple clients
- int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
- LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
- Thread.sleep(sleepTime);
- // the maximum sleep is 2 minutes
- nextWaitMin = Math.min(60000, nextWaitMin * 2);
- }
- } finally {
- scope.close();
- }
-
- return next;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
deleted file mode 100644
index 139a27c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ /dev/null
@@ -1,1915 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.ByteBufferUtil;
-import org.apache.hadoop.fs.CanSetDropBehind;
-import org.apache.hadoop.fs.CanSetReadahead;
-import org.apache.hadoop.fs.CanUnbuffer;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileEncryptionInfo;
-import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.IdentityHashStore;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/****************************************************************
- * DFSInputStream provides bytes from a named file. It handles
- * negotiation of the namenode and various datanodes as necessary.
- ****************************************************************/
-@InterfaceAudience.Private
-public class DFSInputStream extends FSInputStream
-implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
- HasEnhancedByteBufferAccess, CanUnbuffer {
- @VisibleForTesting
- public static boolean tcpReadsDisabledForTesting = false;
- private long hedgedReadOpsLoopNumForTesting = 0;
- protected final DFSClient dfsClient;
- protected AtomicBoolean closed = new AtomicBoolean(false);
- protected final String src;
- protected final boolean verifyChecksum;
-
- // state by stateful read only:
- // (protected by lock on this)
- /////
- private DatanodeInfo currentNode = null;
- protected LocatedBlock currentLocatedBlock = null;
- protected long pos = 0;
- protected long blockEnd = -1;
- private BlockReader blockReader = null;
- ////
-
- // state shared by stateful and positional read:
- // (protected by lock on infoLock)
- ////
- protected LocatedBlocks locatedBlocks = null;
- private long lastBlockBeingWrittenLength = 0;
- private FileEncryptionInfo fileEncryptionInfo = null;
- protected CachingStrategy cachingStrategy;
- ////
-
- protected final ReadStatistics readStatistics = new ReadStatistics();
- // lock for state shared between read and pread
- // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
- // (it's OK to acquire this lock when the lock on <this> is held)
- protected final Object infoLock = new Object();
-
- /**
- * Track the ByteBuffers that we have handed out to readers.
- *
- * The value type can be either ByteBufferPool or ClientMmap, depending on
- * whether we this is a memory-mapped buffer or not.
- */
- private IdentityHashStore<ByteBuffer, Object> extendedReadBuffers;
-
- private synchronized IdentityHashStore<ByteBuffer, Object>
- getExtendedReadBuffers() {
- if (extendedReadBuffers == null) {
- extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
- }
- return extendedReadBuffers;
- }
-
- public static class ReadStatistics {
- public ReadStatistics() {
- clear();
- }
-
- public ReadStatistics(ReadStatistics rhs) {
- this.totalBytesRead = rhs.getTotalBytesRead();
- this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
- this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
- this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
- }
-
- /**
- * @return The total bytes read. This will always be at least as
- * high as the other numbers, since it includes all of them.
- */
- public long getTotalBytesRead() {
- return totalBytesRead;
- }
-
- /**
- * @return The total local bytes read. This will always be at least
- * as high as totalShortCircuitBytesRead, since all short-circuit
- * reads are also local.
- */
- public long getTotalLocalBytesRead() {
- return totalLocalBytesRead;
- }
-
- /**
- * @return The total short-circuit local bytes read.
- */
- public long getTotalShortCircuitBytesRead() {
- return totalShortCircuitBytesRead;
- }
-
- /**
- * @return The total number of zero-copy bytes read.
- */
- public long getTotalZeroCopyBytesRead() {
- return totalZeroCopyBytesRead;
- }
-
- /**
- * @return The total number of bytes read which were not local.
- */
- public long getRemoteBytesRead() {
- return totalBytesRead - totalLocalBytesRead;
- }
-
- void addRemoteBytes(long amt) {
- this.totalBytesRead += amt;
- }
-
- void addLocalBytes(long amt) {
- this.totalBytesRead += amt;
- this.totalLocalBytesRead += amt;
- }
-
- void addShortCircuitBytes(long amt) {
- this.totalBytesRead += amt;
- this.totalLocalBytesRead += amt;
- this.totalShortCircuitBytesRead += amt;
- }
-
- void addZeroCopyBytes(long amt) {
- this.totalBytesRead += amt;
- this.totalLocalBytesRead += amt;
- this.totalShortCircuitBytesRead += amt;
- this.totalZeroCopyBytesRead += amt;
- }
-
- void clear() {
- this.totalBytesRead = 0;
- this.totalLocalBytesRead = 0;
- this.totalShortCircuitBytesRead = 0;
- this.totalZeroCopyBytesRead = 0;
- }
-
- private long totalBytesRead;
-
- private long totalLocalBytesRead;
-
- private long totalShortCircuitBytesRead;
-
- private long totalZeroCopyBytesRead;
- }
-
- /**
- * This variable tracks the number of failures since the start of the
- * most recent user-facing operation. That is to say, it should be reset
- * whenever the user makes a call on this stream, and if at any point
- * during the retry logic, the failure count exceeds a threshold,
- * the errors will be thrown back to the operation.
- *
- * Specifically this counts the number of times the client has gone
- * back to the namenode to get a new list of block locations, and is
- * capped at maxBlockAcquireFailures
- */
- protected int failures = 0;
-
- /* XXX Use of CocurrentHashMap is temp fix. Need to fix
- * parallel accesses to DFSInputStream (through ptreads) properly */
- private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
- new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
-
- private byte[] oneByteBuf; // used for 'int read()'
-
- void addToDeadNodes(DatanodeInfo dnInfo) {
- deadNodes.put(dnInfo, dnInfo);
- }
-
- DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
- LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
- this.dfsClient = dfsClient;
- this.verifyChecksum = verifyChecksum;
- this.src = src;
- synchronized (infoLock) {
- this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
- }
- this.locatedBlocks = locatedBlocks;
- openInfo(false);
- }
-
- /**
- * Grab the open-file info from namenode
- * @param refreshLocatedBlocks whether to re-fetch locatedblocks
- */
- void openInfo(boolean refreshLocatedBlocks) throws IOException,
- UnresolvedLinkException {
- final DfsClientConf conf = dfsClient.getConf();
- synchronized(infoLock) {
- lastBlockBeingWrittenLength =
- fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
- int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
- while (retriesForLastBlockLength > 0) {
- // Getting last block length as -1 is a special case. When cluster
- // restarts, DNs may not report immediately. At this time partial block
- // locations will not be available with NN for getting the length. Lets
- // retry for 3 times to get the length.
- if (lastBlockBeingWrittenLength == -1) {
- DFSClient.LOG.warn("Last block locations not available. "
- + "Datanodes might not have reported blocks completely."
- + " Will retry for " + retriesForLastBlockLength + " times");
- waitFor(conf.getRetryIntervalForGetLastBlockLength());
- lastBlockBeingWrittenLength =
- fetchLocatedBlocksAndGetLastBlockLength(true);
- } else {
- break;
- }
- retriesForLastBlockLength--;
- }
- if (retriesForLastBlockLength == 0) {
- throw new IOException("Could not obtain the last block locations.");
- }
- }
- }
-
- private void waitFor(int waitTime) throws IOException {
- try {
- Thread.sleep(waitTime);
- } catch (InterruptedException e) {
- throw new IOException(
- "Interrupted while getting the last block length.");
- }
- }
-
- private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
- throws IOException {
- LocatedBlocks newInfo = locatedBlocks;
- if (locatedBlocks == null || refresh) {
- newInfo = dfsClient.getLocatedBlocks(src, 0);
- }
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("newInfo = " + newInfo);
- }
- if (newInfo == null) {
- throw new IOException("Cannot open filename " + src);
- }
-
- if (locatedBlocks != null) {
- Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
- Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
- while (oldIter.hasNext() && newIter.hasNext()) {
- if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
- throw new IOException("Blocklist for " + src + " has changed!");
- }
- }
- }
- locatedBlocks = newInfo;
- long lastBlockBeingWrittenLength = 0;
- if (!locatedBlocks.isLastBlockComplete()) {
- final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
- if (last != null) {
- if (last.getLocations().length == 0) {
- if (last.getBlockSize() == 0) {
- // if the length is zero, then no data has been written to
- // datanode. So no need to wait for the locations.
- return 0;
- }
- return -1;
- }
- final long len = readBlockLength(last);
- last.getBlock().setNumBytes(len);
- lastBlockBeingWrittenLength = len;
- }
- }
-
- fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
-
- return lastBlockBeingWrittenLength;
- }
-
- /** Read the block length from one of the datanodes. */
- private long readBlockLength(LocatedBlock locatedblock) throws IOException {
- assert locatedblock != null : "LocatedBlock cannot be null";
- int replicaNotFoundCount = locatedblock.getLocations().length;
-
- final DfsClientConf conf = dfsClient.getConf();
- for(DatanodeInfo datanode : locatedblock.getLocations()) {
- ClientDatanodeProtocol cdp = null;
-
- try {
- cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
- dfsClient.getConfiguration(), conf.getSocketTimeout(),
- conf.isConnectToDnViaHostname(), locatedblock);
-
- final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
-
- if (n >= 0) {
- return n;
- }
- }
- catch(IOException ioe) {
- if (ioe instanceof RemoteException &&
- (((RemoteException) ioe).unwrapRemoteException() instanceof
- ReplicaNotFoundException)) {
- // special case : replica might not be on the DN, treat as 0 length
- replicaNotFoundCount--;
- }
-
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode "
- + datanode + " for block " + locatedblock.getBlock(), ioe);
- }
- } finally {
- if (cdp != null) {
- RPC.stopProxy(cdp);
- }
- }
- }
-
- // Namenode told us about these locations, but none know about the replica
- // means that we hit the race between pipeline creation start and end.
- // we require all 3 because some other exception could have happened
- // on a DN that has it. we want to report that error
- if (replicaNotFoundCount == 0) {
- return 0;
- }
-
- throw new IOException("Cannot obtain block length for " + locatedblock);
- }
-
- public long getFileLength() {
- synchronized(infoLock) {
- return locatedBlocks == null? 0:
- locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
- }
- }
-
- // Short circuit local reads are forbidden for files that are
- // under construction. See HDFS-2757.
- boolean shortCircuitForbidden() {
- synchronized(infoLock) {
- return locatedBlocks.isUnderConstruction();
- }
- }
-
- /**
- * Returns the datanode from which the stream is currently reading.
- */
- public synchronized DatanodeInfo getCurrentDatanode() {
- return currentNode;
- }
-
- /**
- * Returns the block containing the target position.
- */
- synchronized public ExtendedBlock getCurrentBlock() {
- if (currentLocatedBlock == null){
- return null;
- }
- return currentLocatedBlock.getBlock();
- }
-
- /**
- * Return collection of blocks that has already been located.
- */
- public List<LocatedBlock> getAllBlocks() throws IOException {
- return getBlockRange(0, getFileLength());
- }
-
- /**
- * Get block at the specified position.
- * Fetch it from the namenode if not cached.
- *
- * @param offset block corresponding to this offset in file is returned
- * @return located block
- * @throws IOException
- */
- protected LocatedBlock getBlockAt(long offset) throws IOException {
- synchronized(infoLock) {
- assert (locatedBlocks != null) : "locatedBlocks is null";
-
- final LocatedBlock blk;
-
- //check offset
- if (offset < 0 || offset >= getFileLength()) {
- throw new IOException("offset < 0 || offset >= getFileLength(), offset="
- + offset
- + ", locatedBlocks=" + locatedBlocks);
- }
- else if (offset >= locatedBlocks.getFileLength()) {
- // offset to the portion of the last block,
- // which is not known to the name-node yet;
- // getting the last block
- blk = locatedBlocks.getLastLocatedBlock();
- }
- else {
- // search cached blocks first
- int targetBlockIdx = locatedBlocks.findBlock(offset);
- if (targetBlockIdx < 0) { // block is not cached
- targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
- // fetch more blocks
- final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
- assert (newBlocks != null) : "Could not find target position " + offset;
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
- }
- blk = locatedBlocks.get(targetBlockIdx);
- }
- return blk;
- }
- }
-
- /** Fetch a block from namenode and cache it */
- protected void fetchBlockAt(long offset) throws IOException {
- synchronized(infoLock) {
- int targetBlockIdx = locatedBlocks.findBlock(offset);
- if (targetBlockIdx < 0) { // block is not cached
- targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
- }
- // fetch blocks
- final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
- if (newBlocks == null) {
- throw new IOException("Could not find target position " + offset);
- }
- locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
- }
- }
-
- /**
- * Get blocks in the specified range.
- * Fetch them from the namenode if not cached. This function
- * will not get a read request beyond the EOF.
- * @param offset starting offset in file
- * @param length length of data
- * @return consequent segment of located blocks
- * @throws IOException
- */
- private List<LocatedBlock> getBlockRange(long offset,
- long length) throws IOException {
- // getFileLength(): returns total file length
- // locatedBlocks.getFileLength(): returns length of completed blocks
- if (offset >= getFileLength()) {
- throw new IOException("Offset: " + offset +
- " exceeds file length: " + getFileLength());
- }
- synchronized(infoLock) {
- final List<LocatedBlock> blocks;
- final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
- final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
- final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
-
- if (readOffsetWithinCompleteBlk) {
- //get the blocks of finalized (completed) block range
- blocks = getFinalizedBlockRange(offset,
- Math.min(length, lengthOfCompleteBlk - offset));
- } else {
- blocks = new ArrayList<LocatedBlock>(1);
- }
-
- // get the blocks from incomplete block range
- if (readLengthPastCompleteBlk) {
- blocks.add(locatedBlocks.getLastLocatedBlock());
- }
-
- return blocks;
- }
- }
-
- /**
- * Get blocks in the specified range.
- * Includes only the complete blocks.
- * Fetch them from the namenode if not cached.
- */
- private List<LocatedBlock> getFinalizedBlockRange(
- long offset, long length) throws IOException {
- synchronized(infoLock) {
- assert (locatedBlocks != null) : "locatedBlocks is null";
- List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
- // search cached blocks first
- int blockIdx = locatedBlocks.findBlock(offset);
- if (blockIdx < 0) { // block is not cached
- blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
- }
- long remaining = length;
- long curOff = offset;
- while(remaining > 0) {
- LocatedBlock blk = null;
- if(blockIdx < locatedBlocks.locatedBlockCount())
- blk = locatedBlocks.get(blockIdx);
- if (blk == null || curOff < blk.getStartOffset()) {
- LocatedBlocks newBlocks;
- newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
- locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
- continue;
- }
- assert curOff >= blk.getStartOffset() : "Block not found";
- blockRange.add(blk);
- long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
- remaining -= bytesRead;
- curOff += bytesRead;
- blockIdx++;
- }
- return blockRange;
- }
- }
-
- /**
- * Open a DataInputStream to a DataNode so that it can be read from.
- * We get block ID and the IDs of the destinations at startup, from the namenode.
- */
- private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
- if (target >= getFileLength()) {
- throw new IOException("Attempted to read past end of file");
- }
-
- // Will be getting a new BlockReader.
- closeCurrentBlockReaders();
-
- //
- // Connect to best DataNode for desired Block, with potential offset
- //
- DatanodeInfo chosenNode = null;
- int refetchToken = 1; // only need to get a new access token once
- int refetchEncryptionKey = 1; // only need to get a new encryption key once
-
- boolean connectFailedOnce = false;
-
- while (true) {
- //
- // Compute desired block
- //
- LocatedBlock targetBlock = getBlockAt(target);
-
- // update current position
- this.pos = target;
- this.blockEnd = targetBlock.getStartOffset() +
- targetBlock.getBlockSize() - 1;
- this.currentLocatedBlock = targetBlock;
-
- long offsetIntoBlock = target - targetBlock.getStartOffset();
-
- DNAddrPair retval = chooseDataNode(targetBlock, null);
- chosenNode = retval.info;
- InetSocketAddress targetAddr = retval.addr;
- StorageType storageType = retval.storageType;
-
- try {
- blockReader = getBlockReader(targetBlock, offsetIntoBlock,
- targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
- storageType, chosenNode);
- if(connectFailedOnce) {
- DFSClient.LOG.info("Successfully connected to " + targetAddr +
- " for " + targetBlock.getBlock());
- }
- return chosenNode;
- } catch (IOException ex) {
- if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
- DFSClient.LOG.info("Will fetch a new encryption key and retry, "
- + "encryption key was invalid when connecting to " + targetAddr
- + " : " + ex);
- // The encryption key used is invalid.
- refetchEncryptionKey--;
- dfsClient.clearDataEncryptionKey();
- } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
- refetchToken--;
- fetchBlockAt(target);
- } else {
- connectFailedOnce = true;
- DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
- + ", add to deadNodes and continue. " + ex, ex);
- // Put chosen node into dead list, continue
- addToDeadNodes(chosenNode);
- }
- }
- }
- }
-
- protected BlockReader getBlockReader(LocatedBlock targetBlock,
- long offsetInBlock, long length, InetSocketAddress targetAddr,
- StorageType storageType, DatanodeInfo datanode) throws IOException {
- ExtendedBlock blk = targetBlock.getBlock();
- Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
- CachingStrategy curCachingStrategy;
- boolean shortCircuitForbidden;
- synchronized (infoLock) {
- curCachingStrategy = cachingStrategy;
- shortCircuitForbidden = shortCircuitForbidden();
- }
- return new BlockReaderFactory(dfsClient.getConf()).
- setInetSocketAddress(targetAddr).
- setRemotePeerFactory(dfsClient).
- setDatanodeInfo(datanode).
- setStorageType(storageType).
- setFileName(src).
- setBlock(blk).
- setBlockToken(accessToken).
- setStartOffset(offsetInBlock).
- setVerifyChecksum(verifyChecksum).
- setClientName(dfsClient.clientName).
- setLength(length).
- setCachingStrategy(curCachingStrategy).
- setAllowShortCircuitLocalReads(!shortCircuitForbidden).
- setClientCacheContext(dfsClient.getClientContext()).
- setUserGroupInformation(dfsClient.ugi).
- setConfiguration(dfsClient.getConfiguration()).
- build();
- }
-
- /**
- * Close it down!
- */
- @Override
- public synchronized void close() throws IOException {
- if (!closed.compareAndSet(false, true)) {
- DFSClient.LOG.debug("DFSInputStream has been closed already");
- return;
- }
- dfsClient.checkOpen();
-
- if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) {
- final StringBuilder builder = new StringBuilder();
- extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
- private String prefix = "";
- @Override
- public void accept(ByteBuffer k, Object v) {
- builder.append(prefix).append(k);
- prefix = ", ";
- }
- });
- DFSClient.LOG.warn("closing file " + src + ", but there are still " +
- "unreleased ByteBuffers allocated by read(). " +
- "Please release " + builder.toString() + ".");
- }
- closeCurrentBlockReaders();
- super.close();
- }
-
- @Override
- public synchronized int read() throws IOException {
- if (oneByteBuf == null) {
- oneByteBuf = new byte[1];
- }
- int ret = read( oneByteBuf, 0, 1 );
- return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
- }
-
- /**
- * Wraps different possible read implementations so that readBuffer can be
- * strategy-agnostic.
- */
- interface ReaderStrategy {
- public int doRead(BlockReader blockReader, int off, int len)
- throws ChecksumException, IOException;
-
- /**
- * Copy data from the src ByteBuffer into the read buffer.
- * @param src The src buffer where the data is copied from
- * @param offset Useful only when the ReadStrategy is based on a byte array.
- * Indicate the offset of the byte array for copy.
- * @param length Useful only when the ReadStrategy is based on a byte array.
- * Indicate the length of the data to copy.
- */
- public int copyFrom(ByteBuffer src, int offset, int length);
- }
-
- protected void updateReadStatistics(ReadStatistics readStatistics,
- int nRead, BlockReader blockReader) {
- if (nRead <= 0) return;
- synchronized(infoLock) {
- if (blockReader.isShortCircuit()) {
- readStatistics.addShortCircuitBytes(nRead);
- } else if (blockReader.isLocal()) {
- readStatistics.addLocalBytes(nRead);
- } else {
- readStatistics.addRemoteBytes(nRead);
- }
- }
- }
-
- /**
- * Used to read bytes into a byte[]
- */
- private class ByteArrayStrategy implements ReaderStrategy {
- final byte[] buf;
-
- public ByteArrayStrategy(byte[] buf) {
- this.buf = buf;
- }
-
- @Override
- public int doRead(BlockReader blockReader, int off, int len)
- throws ChecksumException, IOException {
- int nRead = blockReader.read(buf, off, len);
- updateReadStatistics(readStatistics, nRead, blockReader);
- return nRead;
- }
-
- @Override
- public int copyFrom(ByteBuffer src, int offset, int length) {
- ByteBuffer writeSlice = src.duplicate();
- writeSlice.get(buf, offset, length);
- return length;
- }
- }
-
- /**
- * Used to read bytes into a user-supplied ByteBuffer
- */
- protected class ByteBufferStrategy implements ReaderStrategy {
- final ByteBuffer buf;
- ByteBufferStrategy(ByteBuffer buf) {
- this.buf = buf;
- }
-
- @Override
- public int doRead(BlockReader blockReader, int off, int len)
- throws ChecksumException, IOException {
- int oldpos = buf.position();
- int oldlimit = buf.limit();
- boolean success = false;
- try {
- int ret = blockReader.read(buf);
- success = true;
- updateReadStatistics(readStatistics, ret, blockReader);
- if (ret == 0) {
- DFSClient.LOG.warn("zero");
- }
- return ret;
- } finally {
- if (!success) {
- // Reset to original state so that retries work correctly.
- buf.position(oldpos);
- buf.limit(oldlimit);
- }
- }
- }
-
- @Override
- public int copyFrom(ByteBuffer src, int offset, int length) {
- ByteBuffer writeSlice = src.duplicate();
- int remaining = Math.min(buf.remaining(), writeSlice.remaining());
- writeSlice.limit(writeSlice.position() + remaining);
- buf.put(writeSlice);
- return remaining;
- }
- }
-
- /* This is a used by regular read() and handles ChecksumExceptions.
- * name readBuffer() is chosen to imply similarity to readBuffer() in
- * ChecksumFileSystem
- */
- private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
- Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
- throws IOException {
- IOException ioe;
-
- /* we retry current node only once. So this is set to true only here.
- * Intention is to handle one common case of an error that is not a
- * failure on datanode or client : when DataNode closes the connection
- * since client is idle. If there are other cases of "non-errors" then
- * then a datanode might be retried by setting this to true again.
- */
- boolean retryCurrentNode = true;
-
- while (true) {
- // retry as many times as seekToNewSource allows.
- try {
- return reader.doRead(blockReader, off, len);
- } catch ( ChecksumException ce ) {
- DFSClient.LOG.warn("Found Checksum error for "
- + getCurrentBlock() + " from " + currentNode
- + " at " + ce.getPos());
- ioe = ce;
- retryCurrentNode = false;
- // we want to remember which block replicas we have tried
- addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
- corruptedBlockMap);
- } catch ( IOException e ) {
- if (!retryCurrentNode) {
- DFSClient.LOG.warn("Exception while reading from "
- + getCurrentBlock() + " of " + src + " from "
- + currentNode, e);
- }
- ioe = e;
- }
- boolean sourceFound = false;
- if (retryCurrentNode) {
- /* possibly retry the same node so that transient errors don't
- * result in application level failures (e.g. Datanode could have
- * closed the connection because the client is idle for too long).
- */
- sourceFound = seekToBlockSource(pos);
- } else {
- addToDeadNodes(currentNode);
- sourceFound = seekToNewSource(pos);
- }
- if (!sourceFound) {
- throw ioe;
- }
- retryCurrentNode = false;
- }
- }
-
- protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
- dfsClient.checkOpen();
- if (closed.get()) {
- throw new IOException("Stream closed");
- }
- Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
- = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
- failures = 0;
- if (pos < getFileLength()) {
- int retries = 2;
- while (retries > 0) {
- try {
- // currentNode can be left as null if previous read had a checksum
- // error on the same block. See HDFS-3067
- if (pos > blockEnd || currentNode == null) {
- currentNode = blockSeekTo(pos);
- }
- int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
- synchronized(infoLock) {
- if (locatedBlocks.isLastBlockComplete()) {
- realLen = (int) Math.min(realLen,
- locatedBlocks.getFileLength() - pos);
- }
- }
- int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
-
- if (result >= 0) {
- pos += result;
- } else {
- // got a EOS from reader though we expect more data on it.
- throw new IOException("Unexpected EOS from the reader");
- }
- if (dfsClient.stats != null) {
- dfsClient.stats.incrementBytesRead(result);
- }
- return result;
- } catch (ChecksumException ce) {
- throw ce;
- } catch (IOException e) {
- if (retries == 1) {
- DFSClient.LOG.warn("DFS Read", e);
- }
- blockEnd = -1;
- if (currentNode != null) { addToDeadNodes(currentNode); }
- if (--retries == 0) {
- throw e;
- }
- } finally {
- // Check if need to report block replicas corruption either read
- // was successful or ChecksumException occured.
- reportCheckSumFailure(corruptedBlockMap,
- currentLocatedBlock.getLocations().length);
- }
- }
- }
- return -1;
- }
-
- /**
- * Read the entire buffer.
- */
- @Override
- public synchronized int read(final byte buf[], int off, int len) throws IOException {
- ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
- TraceScope scope =
- dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src);
- try {
- return readWithStrategy(byteArrayReader, off, len);
- } finally {
- scope.close();
- }
- }
-
- @Override
- public synchronized int read(final ByteBuffer buf) throws IOException {
- ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
- TraceScope scope =
- dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
- try {
- return readWithStrategy(byteBufferReader, 0, buf.remaining());
- } finally {
- scope.close();
- }
- }
-
-
- /**
- * Add corrupted block replica into map.
- */
- protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
- Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
- Set<DatanodeInfo> dnSet = null;
- if((corruptedBlockMap.containsKey(blk))) {
- dnSet = corruptedBlockMap.get(blk);
- }else {
- dnSet = new HashSet<DatanodeInfo>();
- }
- if (!dnSet.contains(node)) {
- dnSet.add(node);
- corruptedBlockMap.put(blk, dnSet);
- }
- }
-
- private DNAddrPair chooseDataNode(LocatedBlock block,
- Collection<DatanodeInfo> ignoredNodes) throws IOException {
- while (true) {
- DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
- if (result != null) {
- return result;
- } else {
- String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
- deadNodes, ignoredNodes);
- String blockInfo = block.getBlock() + " file=" + src;
- if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
- String description = "Could not obtain block: " + blockInfo;
- DFSClient.LOG.warn(description + errMsg
- + ". Throwing a BlockMissingException");
- throw new BlockMissingException(src, description,
- block.getStartOffset());
- }
-
- DatanodeInfo[] nodes = block.getLocations();
- if (nodes == null || nodes.length == 0) {
- DFSClient.LOG.info("No node available for " + blockInfo);
- }
- DFSClient.LOG.info("Could not obtain " + block.getBlock()
- + " from any node: " + errMsg
- + ". Will get new block locations from namenode and retry...");
- try {
- // Introducing a random factor to the wait time before another retry.
- // The wait time is dependent on # of failures and a random factor.
- // At the first time of getting a BlockMissingException, the wait time
- // is a random number between 0..3000 ms. If the first retry
- // still fails, we will wait 3000 ms grace period before the 2nd retry.
- // Also at the second retry, the waiting window is expanded to 6000 ms
- // alleviating the request rate from the server. Similarly the 3rd retry
- // will wait 6000ms grace period before retry and the waiting window is
- // expanded to 9000ms.
- final int timeWindow = dfsClient.getConf().getTimeWindow();
- double waitTime = timeWindow * failures + // grace period for the last round of attempt
- // expanding time window for each failure
- timeWindow * (failures + 1) *
- ThreadLocalRandom.current().nextDouble();
- DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
- Thread.sleep((long)waitTime);
- } catch (InterruptedException iex) {
- }
- deadNodes.clear(); //2nd option is to remove only nodes[blockId]
- openInfo(true);
- block = refreshLocatedBlock(block);
- failures++;
- }
- }
- }
-
- /**
- * Get the best node from which to stream the data.
- * @param block LocatedBlock, containing nodes in priority order.
- * @param ignoredNodes Do not choose nodes in this array (may be null)
- * @return The DNAddrPair of the best node. Null if no node can be chosen.
- */
- protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
- Collection<DatanodeInfo> ignoredNodes) {
- DatanodeInfo[] nodes = block.getLocations();
- StorageType[] storageTypes = block.getStorageTypes();
- DatanodeInfo chosenNode = null;
- StorageType storageType = null;
- if (nodes != null) {
- for (int i = 0; i < nodes.length; i++) {
- if (!deadNodes.containsKey(nodes[i])
- && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
- chosenNode = nodes[i];
- // Storage types are ordered to correspond with nodes, so use the same
- // index to get storage type.
- if (storageTypes != null && i < storageTypes.length) {
- storageType = storageTypes[i];
- }
- break;
- }
- }
- }
- if (chosenNode == null) {
- DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() +
- " after checking nodes = " + Arrays.toString(nodes) +
- ", ignoredNodes = " + ignoredNodes);
- return null;
- }
- final String dnAddr =
- chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
- }
- InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
- return new DNAddrPair(chosenNode, targetAddr, storageType);
- }
-
- private static String getBestNodeDNAddrPairErrorString(
- DatanodeInfo nodes[], AbstractMap<DatanodeInfo,
- DatanodeInfo> deadNodes, Collection<DatanodeInfo> ignoredNodes) {
- StringBuilder errMsgr = new StringBuilder(
- " No live nodes contain current block ");
- errMsgr.append("Block locations:");
- for (DatanodeInfo datanode : nodes) {
- errMsgr.append(" ");
- errMsgr.append(datanode.toString());
- }
- errMsgr.append(" Dead nodes: ");
- for (DatanodeInfo datanode : deadNodes.keySet()) {
- errMsgr.append(" ");
- errMsgr.append(datanode.toString());
- }
- if (ignoredNodes != null) {
- errMsgr.append(" Ignored nodes: ");
- for (DatanodeInfo datanode : ignoredNodes) {
- errMsgr.append(" ");
- errMsgr.append(datanode.toString());
- }
- }
- return errMsgr.toString();
- }
-
- protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
- byte[] buf, int offset,
- Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
- throws IOException {
- block = refreshLocatedBlock(block);
- while (true) {
- DNAddrPair addressPair = chooseDataNode(block, null);
- try {
- actualGetFromOneDataNode(addressPair, block, start, end,
- buf, offset, corruptedBlockMap);
- return;
- } catch (IOException e) {
- // Ignore. Already processed inside the function.
- // Loop through to try the next node.
- }
- }
- }
-
- private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
- final LocatedBlock block, final long start, final long end,
- final ByteBuffer bb,
- final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
- final int hedgedReadId) {
- final Span parentSpan = Trace.currentSpan();
- return new Callable<ByteBuffer>() {
- @Override
- public ByteBuffer call() throws Exception {
- byte[] buf = bb.array();
- int offset = bb.position();
- TraceScope scope =
- Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
- try {
- actualGetFromOneDataNode(datanode, block, start, end, buf,
- offset, corruptedBlockMap);
- return bb;
- } finally {
- scope.close();
- }
- }
- };
- }
-
- /**
- * Used when reading contiguous blocks
- */
- private void actualGetFromOneDataNode(final DNAddrPair datanode,
- LocatedBlock block, final long start, final long end, byte[] buf,
- int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
- throws IOException {
- final int length = (int) (end - start + 1);
- actualGetFromOneDataNode(datanode, block, start, end, buf,
- new int[]{offset}, new int[]{length}, corruptedBlockMap);
- }
-
- /**
- * Read data from one DataNode.
- * @param datanode the datanode from which to read data
- * @param block the located block containing the requested data
- * @param startInBlk the startInBlk offset of the block
- * @param endInBlk the endInBlk offset of the block
- * @param buf the given byte array into which the data is read
- * @param offsets the data may be read into multiple segments of the buf
- * (when reading a striped block). this array indicates the
- * offset of each buf segment.
- * @param lengths the length of each buf segment
- * @param corruptedBlockMap map recording list of datanodes with corrupted
- * block replica
- */
- void actualGetFromOneDataNode(final DNAddrPair datanode,
- LocatedBlock block, final long startInBlk, final long endInBlk,
- byte[] buf, int[] offsets, int[] lengths,
- Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
- throws IOException {
- DFSClientFaultInjector.get().startFetchFromDatanode();
- int refetchToken = 1; // only need to get a new access token once
- int refetchEncryptionKey = 1; // only need to get a new encryption key once
- final int len = (int) (endInBlk - startInBlk + 1);
- checkReadPortions(offsets, lengths, len);
-
- while (true) {
- // cached block locations may have been updated by chooseDataNode()
- // or fetchBlockAt(). Always get the latest list of locations at the
- // start of the loop.
- block = refreshLocatedBlock(block);
- BlockReader reader = null;
- try {
- DFSClientFaultInjector.get().fetchFromDatanodeException();
- reader = getBlockReader(block, startInBlk, len, datanode.addr,
- datanode.storageType, datanode.info);
- for (int i = 0; i < offsets.length; i++) {
- int nread = reader.readAll(buf, offsets[i], lengths[i]);
- updateReadStatistics(readStatistics, nread, reader);
- if (nread != lengths[i]) {
- throw new IOException("truncated return from reader.read(): " +
- "excpected " + lengths[i] + ", got " + nread);
- }
- }
- DFSClientFaultInjector.get().readFromDatanodeDelay();
- return;
- } catch (ChecksumException e) {
- String msg = "fetchBlockByteRange(). Got a checksum exception for "
- + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
- + datanode.info;
- DFSClient.LOG.warn(msg);
- // we want to remember what we have tried
- addIntoCorruptedBlockMap(block.getBlock(), datanode.info,
- corruptedBlockMap);
- addToDeadNodes(datanode.info);
- throw new IOException(msg);
- } catch (IOException e) {
- if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
- DFSClient.LOG.info("Will fetch a new encryption key and retry, "
- + "encryption key was invalid when connecting to " + datanode.addr
- + " : " + e);
- // The encryption key used is invalid.
- refetchEncryptionKey--;
- dfsClient.clearDataEncryptionKey();
- } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) {
- refetchToken--;
- try {
- fetchBlockAt(block.getStartOffset());
- } catch (IOException fbae) {
- // ignore IOE, since we can retry it later in a loop
- }
- } else {
- String msg = "Failed to connect to " + datanode.addr + " for file "
- + src + " for block " + block.getBlock() + ":" + e;
- DFSClient.LOG.warn("Connection failure: " + msg, e);
- addToDeadNodes(datanode.info);
- throw new IOException(msg);
- }
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- }
- }
-
- /**
- * Refresh cached block locations.
- * @param block The currently cached block locations
- * @return Refreshed block locations
- * @throws IOException
- */
- protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
- throws IOException {
- return getBlockAt(block.getStartOffset());
- }
-
- /**
- * This method verifies that the read portions are valid and do not overlap
- * with each other.
- */
- private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
- Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
- int sum = 0;
- for (int i = 0; i < lengths.length; i++) {
- if (i > 0) {
- int gap = offsets[i] - offsets[i - 1];
- // make sure read portions do not overlap with each other
- Preconditions.checkArgument(gap >= lengths[i - 1]);
- }
- sum += lengths[i];
- }
- Preconditions.checkArgument(sum == totalLen);
- }
-
- /**
- * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
- * 'hedged' read if the first read is taking longer than configured amount of
- * time. We then wait on which ever read returns first.
- */
- private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
- long end, byte[] buf, int offset,
- Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
- throws IOException {
- final DfsClientConf conf = dfsClient.getConf();
- ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
- CompletionService<ByteBuffer> hedgedService =
- new ExecutorCompletionService<ByteBuffer>(
- dfsClient.getHedgedReadsThreadPool());
- ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
- ByteBuffer bb = null;
- int len = (int) (end - start + 1);
- int hedgedReadId = 0;
- block = refreshLocatedBlock(block);
- while (true) {
- // see HDFS-6591, this metric is used to verify/catch unnecessary loops
- hedgedReadOpsLoopNumForTesting++;
- DNAddrPair chosenNode = null;
- // there is no request already executing.
- if (futures.isEmpty()) {
- // chooseDataNode is a commitment. If no node, we go to
- // the NN to reget block locations. Only go here on first read.
- chosenNode = chooseDataNode(block, ignored);
- bb = ByteBuffer.wrap(buf, offset, len);
- Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block, start, end, bb,
- corruptedBlockMap, hedgedReadId++);
- Future<ByteBuffer> firstRequest = hedgedService
- .submit(getFromDataNodeCallable);
- futures.add(firstRequest);
- try {
- Future<ByteBuffer> future = hedgedService.poll(
- conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
- if (future != null) {
- future.get();
- return;
- }
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Waited " + conf.getHedgedReadThresholdMillis()
- + "ms to read from " + chosenNode.info
- + "; spawning hedged read");
- }
- // Ignore this node on next go around.
- ignored.add(chosenNode.info);
- dfsClient.getHedgedReadMetrics().incHedgedReadOps();
- continue; // no need to refresh block locations
- } catch (InterruptedException e) {
- // Ignore
- } catch (ExecutionException e) {
- // Ignore already logged in the call.
- }
- } else {
- // We are starting up a 'hedged' read. We have a read already
- // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
- // If no nodes to do hedged reads against, pass.
- try {
- chosenNode = getBestNodeDNAddrPair(block, ignored);
- if (chosenNode == null) {
- chosenNode = chooseDataNode(block, ignored);
- }
- bb = ByteBuffer.allocate(len);
- Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block, start, end, bb,
- corruptedBlockMap, hedgedReadId++);
- Future<ByteBuffer> oneMoreRequest = hedgedService
- .submit(getFromDataNodeCallable);
- futures.add(oneMoreRequest);
- } catch (IOException ioe) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Failed getting node for hedged read: "
- + ioe.getMessage());
- }
- }
- // if not succeeded. Submit callables for each datanode in a loop, wait
- // for a fixed interval and get the result from the fastest one.
- try {
- ByteBuffer result = getFirstToComplete(hedgedService, futures);
- // cancel the rest.
- cancelAll(futures);
- if (result.array() != buf) { // compare the array pointers
- dfsClient.getHedgedReadMetrics().incHedgedReadWins();
- System.arraycopy(result.array(), result.position(), buf, offset,
- len);
- } else {
- dfsClient.getHedgedReadMetrics().incHedgedReadOps();
- }
- return;
- } catch (InterruptedException ie) {
- // Ignore and retry
- }
- // We got here if exception. Ignore this node on next go around IFF
- // we found a chosenNode to hedge read against.
- if (chosenNode != null && chosenNode.info != null) {
- ignored.add(chosenNode.info);
- }
- }
- }
- }
-
- @VisibleForTesting
- public long getHedgedReadOpsLoopNumForTesting() {
- return hedgedReadOpsLoopNumForTesting;
- }
-
- private ByteBuffer getFirstToComplete(
- CompletionService<ByteBuffer> hedgedService,
- ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
- if (futures.isEmpty()) {
- throw new InterruptedException("let's retry");
- }
- Future<ByteBuffer> future = null;
- try {
- future = hedgedService.take();
- ByteBuffer bb = future.get();
- futures.remove(future);
- return bb;
- } catch (ExecutionException e) {
- // already logged in the Callable
- futures.remove(future);
- } catch (CancellationException ce) {
- // already logged in the Callable
- futures.remove(future);
- }
-
- throw new InterruptedException("let's retry");
- }
-
- private void cancelAll(List<Future<ByteBuffer>> futures) {
- for (Future<ByteBuffer> future : futures) {
- // Unfortunately, hdfs reads do not take kindly to interruption.
- // Threads return a variety of interrupted-type exceptions but
- // also complaints about invalid pbs -- likely because read
- // is interrupted before gets whole pb. Also verbose WARN
- // logging. So, for now, do not interrupt running read.
- future.cancel(false);
- }
- }
-
- /**
- * Should the block access token be refetched on an exception
- *
- * @param ex Exception received
- * @param targetAddr Target datanode address from where exception was received
- * @return true if block access token has expired or invalid and it should be
- * refetched
- */
- protected static boolean tokenRefetchNeeded(IOException ex,
- InetSocketAddress targetAddr) {
- /*
- * Get a new access token and retry. Retry is needed in 2 cases. 1)
- * When both NN and DN re-started while DFSClient holding a cached
- * access token. 2) In the case that NN fails to update its
- * access key at pre-set interval (by a wide margin) and
- * subsequently restarts. In this case, DN re-registers itself with
- * NN and receives a new access key, but DN will delete the old
- * access key from its memory since it's considered expired based on
- * the estimated expiration date.
- */
- if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
- DFSClient.LOG.info("Access token was invalid when connecting to "
- + targetAddr + " : " + ex);
- return true;
- }
- return false;
- }
-
- /**
- * Read bytes starting from the specified position.
- *
- * @param position start read from this position
- * @param buffer read buffer
- * @param offset offset into buffer
- * @param length number of bytes to read
- *
- * @return actual number of bytes read
- */
- @Override
- public int read(long position, byte[] buffer, int offset, int length)
- throws IOException {
- TraceScope scope =
- dfsClient.getPathTraceScope("DFSInputStream#byteArrayPread", src);
- try {
- return pread(position, buffer, offset, length);
- } finally {
- scope.close();
- }
- }
-
- private int pread(long position, byte[] buffer, int offset, int length)
- throws IOException {
- // sanity checks
- dfsClient.checkOpen();
- if (closed.get()) {
- throw new IOException("Stream closed");
- }
- failures = 0;
- long filelen = getFileLength();
- if ((position < 0) || (position >= filelen)) {
- return -1;
- }
- int realLen = length;
- if ((position + length) > filelen) {
- realLen = (int)(filelen - position);
- }
-
- // determine the block and byte range within the block
- // corresponding to position and realLen
- List<LocatedBlock> blockRange = getBlockRange(position, realLen);
- int remaining = realLen;
- Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
- = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
- for (LocatedBlock blk : blockRange) {
- long targetStart = position - blk.getStartOffset();
- long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
- try {
- if (dfsClient.isHedgedReadsEnabled()) {
- hedgedFetchBlockByteRange(blk, targetStart,
- targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
- } else {
- fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
- buffer, offset, corruptedBlockMap);
- }
- } finally {
- // Check and report if any block replicas are corrupted.
- // BlockMissingException may be caught if all block replicas are
- // corrupted.
- reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
- }
-
- remaining -= bytesToRead;
- position += bytesToRead;
- offset += bytesToRead;
- }
- assert remaining == 0 : "Wrong number of bytes read.";
- if (dfsClient.stats != null) {
- dfsClient.stats.incrementBytesRead(realLen);
- }
- return realLen;
- }
-
- /**
- * DFSInputStream reports checksum failure.
- * Case I : client has tried multiple data nodes and at least one of the
- * attempts has succeeded. We report the other failures as corrupted block to
- * namenode.
- * Case II: client has tried out all data nodes, but all failed. We
- * only report if the total number of replica is 1. We do not
- * report otherwise since this maybe due to the client is a handicapped client
- * (who can not read).
- * @param corruptedBlockMap map of corrupted blocks
- * @param dataNodeCount number of data nodes who contains the block replicas
- */
- protected void reportCheckSumFailure(
- Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
- int dataNodeCount) {
- if (corruptedBlockMap.isEmpty()) {
- return;
- }
- Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
- .entrySet().iterator();
- Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
- ExtendedBlock blk = entry.getKey();
- Set<DatanodeInfo> dnSet = entry.getValue();
- if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
- || ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
- DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
- int i = 0;
- for (DatanodeInfo dn:dnSet) {
- locs[i++] = dn;
- }
- LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
- dfsClient.reportChecksumFailure(src, lblocks);
- }
- corruptedBlockMap.clear();
- }
-
- @Override
- public long skip(long n) throws IOException {
- if ( n > 0 ) {
- long curPos = getPos();
- long fileLen = getFileLength();
- if( n+curPos > fileLen ) {
- n = fileLen - curPos;
- }
- seek(curPos+n);
- return n;
- }
- return n < 0 ? -1 : 0;
- }
-
- /**
- * Seek to a new arbitrary location
- */
- @Override
- public synchronized void seek(long targetPos) throws IOException {
- if (targetPos > getFileLength()) {
- throw new EOFException("Cannot seek after EOF");
- }
- if (targetPos < 0) {
- throw new EOFException("Cannot seek to negative offset");
- }
- if (closed.get()) {
- throw new IOException("Stream is closed!");
- }
- boolean done = false;
- if (pos <= targetPos && targetPos <= blockEnd) {
- //
- // If this seek is to a positive position in the current
- // block, and this piece of data might already be lying in
- // the TCP buffer, then just eat up the intervening data.
- //
- int diff = (int)(targetPos - pos);
- if (diff <= blockReader.available()) {
- try {
- pos += blockReader.skip(diff);
- if (pos == targetPos) {
- done = true;
- } else {
- // The range was already checked. If the block reader returns
- // something unexpected instead of throwing an exception, it is
- // most likely a bug.
- String errMsg = "BlockReader failed to seek to " +
- targetPos + ". Instead, it seeked to " + pos + ".";
- DFSClient.LOG.warn(errMsg);
- throw new IOException(errMsg);
- }
- } catch (IOException e) {//make following read to retry
- if(DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Exception while seek to " + targetPos
- + " from " + getCurrentBlock() + " of " + src + " from "
- + currentNode, e);
- }
- }
- }
- }
- if (!done) {
- pos = targetPos;
- blockEnd = -1;
- }
- }
-
- /**
- * Same as {@link #seekToNewSource(long)} except that it does not exclude
- * the current datanode and might connect to the same node.
- */
- private boolean seekToBlockSource(long targetPos)
- throws IOException {
- currentNode = blockSeekTo(targetPos);
- return true;
- }
-
- /**
- * Seek to given position on a node other than the current node. If
- * a node other than the current node is found, then returns true.
- * If another node could not be found, then returns false.
- */
- @Override
- public synchronized boolean seekToNewSource(long targetPos) throws IOException {
- if (currentNode == null) {
- return seekToBlockSource(targetPos);
- }
- boolean markedDead = deadNodes.containsKey(currentNode);
- addToDeadNodes(currentNode);
- DatanodeInfo oldNode = currentNode;
- DatanodeInfo newNode = blockSeekTo(targetPos);
- if (!markedDead) {
- /* remove it from deadNodes. blockSeekTo could have cleared
- * deadNodes and added currentNode again. Thats ok. */
- deadNodes.remove(oldNode);
- }
- if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
- currentNode = newNode;
- return true;
- } else {
- return false;
- }
- }
-
- /**
- */
- @Override
- public synchronized long getPos() {
- return pos;
- }
-
- /** Return the size of the remaining available bytes
- * if the size is less than or equal to {@link Integer#MAX_VALUE},
- * otherwise, return {@link Integer#MAX_VALUE}.
- */
- @Override
- public synchronized int available() throws IOException {
- if (closed.get()) {
- throw new IOException("Stream closed");
- }
-
- final long remaining = getFileLength() - pos;
- return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE;
- }
-
- /**
- * We definitely don't support marks
- */
- @Override
- public boolean markSupported() {
- return false;
- }
- @Override
- public void mark(int readLimit) {
- }
- @Override
- public void reset() throws IOException {
- throw new IOException("Mark/reset not supported");
- }
-
- /** Utility class to encapsulate data node info and its address. */
- static final class DNAddrPair {
- final DatanodeInfo info;
- final InetSocketAddress addr;
- final StorageType storageType;
-
- DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
- StorageType storageType) {
- this.info = info;
- this.addr = addr;
- this.storageType = storageType;
- }
- }
-
- /**
- * Get statistics about the reads which this DFSInputStream has done.
- */
- public ReadStatistics getReadStatistics() {
- synchronized(infoLock) {
- return new ReadStatistics(readStatistics);
- }
- }
-
- /**
- * Clear statistics about the reads which this DFSInputStream has done.
- */
- public void clearReadStatistics() {
- synchronized(infoLock) {
- readStatistics.clear();
- }
- }
-
- public FileEncryptionInfo getFileEncryptionInfo() {
- synchronized(infoLock) {
- return fileEncryptionInfo;
- }
- }
-
- protected void closeCurrentBlockReaders() {
- if (blockReader == null) return;
- // Close the current block reader so that the new caching settings can
- // take effect immediately.
- try {
- blockReader.close();
- } catch (IOException e) {
- DFSClient.LOG.error("error closing blockReader", e);
- }
- blockReader = null;
- blockEnd = -1;
- }
-
- @Override
- public synchronized void setReadahead(Long readahead)
- throws IOException {
- synchronized (infoLock) {
- this.cachingStrategy =
- new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
- }
- closeCurrentBlockReaders();
- }
-
- @Override
- public synchronized void setDropBehind(Boolean dropBehind)
- throws IOException {
- synchronized (infoLock) {
- this.cachingStrategy =
- new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
- }
- closeCurrentBlockReaders();
- }
-
- /**
- * The immutable empty buffer we return when we reach EOF when doing a
- * zero-copy read.
- */
- private static final ByteBuffer EMPTY_BUFFER =
- ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
-
- @Override
- public synchronized ByteBuffer read(ByteBufferPool bufferPool,
- int maxLength, EnumSet<ReadOption> opts)
- throws IOException, UnsupportedOperationException {
- if (maxLength == 0) {
- return EMPTY_BUFFER;
- } else if (maxLength < 0) {
- throw new IllegalArgumentException("can't read a negative " +
- "number of bytes.");
- }
- if ((blockReader == null) || (blockEnd == -1)) {
- if (pos >= getFileLength()) {
- return null;
- }
- /*
- * If we don't have a blockReader, or the one we have has no more bytes
- * left to read, we call seekToBlockSource to get a new blockReader and
- * recalculate blockEnd. Note that we assume we're not at EOF here
- * (we check this above).
- */
- if ((!seekToBlockSource(pos)) || (blockReader == null)) {
- throw new IOException("failed to allocate new BlockReader " +
- "at position " + pos);
- }
- }
- ByteBuffer buffer = null;
- if (dfsClient.getConf().getShortCircuitConf().isShortCircuitMmapEnabled()) {
- buffer = tryReadZeroCopy(maxLength, opts);
- }
- if (buffer != null) {
- return buffer;
- }
- buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
- if (buffer != null) {
- getExtendedReadBuffers().put(buffer, bufferPool);
- }
- return buffer;
- }
-
- private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
- EnumSet<ReadOption> opts) throws IOException {
- // Copy 'pos' and 'blockEnd' to local variables to make it easier for the
- // JVM to optimize this function.
- final long curPos = pos;
- final long curEnd = blockEnd;
- final long blockStartInFile = currentLocatedBlock.getStartOffset();
- final long blockPos = curPos - blockStartInFile;
-
- // Shorten this read if the end of the block is nearby.
- long length63;
- if ((curPos + maxLength) <= (curEnd + 1)) {
- length63 = maxLength;
- } else {
- length63 = 1 + curEnd - curPos;
- if (length63 <= 0) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
- curPos + " of " + src + "; " + length63 + " bytes left in block. " +
- "blockPos=" + blockPos + "; curPos=" + curPos +
- "; curEnd=" + curEnd);
- }
- return null;
- }
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Reducing read length from " + maxLength +
- " to " + length63 + " to avoid going more than one byte " +
- "past the end of the block. blockPos=" + blockPos +
- "; curPos=" + curPos + "; curEnd=" + curEnd);
- }
- }
- // Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
- int length;
- if (blockPos + length63 <= Integer.MAX_VALUE) {
- length = (int)length63;
- } else {
- long length31 = Integer.MAX_VALUE - blockPos;
- if (length31 <= 0) {
- // Java ByteBuffers can't be longer than 2 GB, because they use
- // 4-byte signed integers to represent capacity, etc.
- // So we can't mmap the parts of the block higher than the 2 GB offset.
- // FIXME: we could work around this with multiple memory maps.
- // See HDFS-5101.
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
- curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
- "exceeded. blockPos=" + blockPos + ", curEnd=" + curEnd);
- }
- return null;
- }
- length = (int)length31;
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Reducing read length from " + maxLength +
- " to " + length + " to avoid 31-bit limit. " +
- "blockPos=" + blockPos + "; curPos=" + curPos +
- "; curEnd=" + curEnd);
- }
- }
- final ClientMmap clientMmap = blockReader.getClientMmap(opts);
- if (clientMmap == null) {
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
- curPos + " of " + src + "; BlockReader#getClientMmap returned " +
- "null.");
- }
- return null;
- }
- boolean success = false;
- ByteBuffer buffer;
- try {
- seek(curPos + length);
- buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
- buffer.position((int)blockPos);
- buffer.limit((int)(blockPos + length));
- getExtendedReadBuffers().put(buffer, clientMmap);
- synchronized (infoLock) {
- readStatistics.addZeroCopyBytes(length);
- }
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("readZeroCopy read " + length +
- " bytes from offset " + curPos + " via the zero-copy read " +
- "path. blockEnd = " + blockEnd);
- }
- success = true;
- } finally {
- if (!success) {
- IOUtils.closeQuietly(clientMmap);
- }
- }
- return buffer;
- }
-
- @Override
- public synchronized void releaseBuffer(ByteBuffer buffer) {
- if (buffer == EMPTY_BUFFER) return;
- Object val = getExtendedReadBuffers().remove(buffer);
- if (val == null) {
- throw new IllegalArgumentException("tried to release a buffer " +
- "that was not created by this stream, " + buffer);
- }
- if (val instanceof ClientMmap) {
- IOUtils.closeQuietly((ClientMmap)val);
- } else if (val instanceof ByteBufferPool) {
- ((ByteBufferPool)val).putBuffer(buffer);
- }
- }
-
- @Override
- public synchronized void unbuffer() {
- closeCurrentBlockReaders();
- }
-}