You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:39 UTC
[27/50] [abbrv] hadoop git commit: HDFS-8925. Move BlockReaderLocal
to hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
new file mode 100644
index 0000000..55aa741
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.util.IOUtilsClient;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache of input stream sockets to Data Node.
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+@VisibleForTesting
+public class PeerCache {
+ private static final Logger LOG = LoggerFactory.getLogger(PeerCache.class);
+
+ private static class Key {
+ final DatanodeID dnID;
+ final boolean isDomain;
+
+ Key(DatanodeID dnID, boolean isDomain) {
+ this.dnID = dnID;
+ this.isDomain = isDomain;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Key)) {
+ return false;
+ }
+ Key other = (Key)o;
+ return dnID.equals(other.dnID) && isDomain == other.isDomain;
+ }
+
+ @Override
+ public int hashCode() {
+ return dnID.hashCode() ^ (isDomain ? 1 : 0);
+ }
+ }
+
+ private static class Value {
+ private final Peer peer;
+ private final long time;
+
+ Value(Peer peer, long time) {
+ this.peer = peer;
+ this.time = time;
+ }
+
+ Peer getPeer() {
+ return peer;
+ }
+
+ long getTime() {
+ return time;
+ }
+ }
+
+ private Daemon daemon;
+ /** A map for per user per datanode. */
+ private final LinkedListMultimap<Key, Value> multimap =
+ LinkedListMultimap.create();
+ private final int capacity;
+ private final long expiryPeriod;
+
+ public PeerCache(int c, long e) {
+ this.capacity = c;
+ this.expiryPeriod = e;
+
+ if (capacity == 0 ) {
+ LOG.info("SocketCache disabled.");
+ } else if (expiryPeriod == 0) {
+ throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+ expiryPeriod + " when cache is enabled.");
+ }
+ }
+
+ private boolean isDaemonStarted() {
+ return (daemon == null)? false: true;
+ }
+
+ private synchronized void startExpiryDaemon() {
+ // start daemon only if not already started
+ if (isDaemonStarted() == true) {
+ return;
+ }
+
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ PeerCache.this.run();
+ } catch(InterruptedException e) {
+ //noop
+ } finally {
+ PeerCache.this.clear();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(PeerCache.this);
+ }
+ });
+ daemon.start();
+ }
+
+ /**
+ * Get a cached peer connected to the given DataNode.
+ * @param dnId The DataNode to get a Peer for.
+ * @param isDomain Whether to retrieve a DomainPeer or not.
+ *
+ * @return An open Peer connected to the DN, or null if none
+ * was found.
+ */
+ public Peer get(DatanodeID dnId, boolean isDomain) {
+
+ if (capacity <= 0) { // disabled
+ return null;
+ }
+ return getInternal(dnId, isDomain);
+ }
+
+ private synchronized Peer getInternal(DatanodeID dnId, boolean isDomain) {
+ List<Value> sockStreamList = multimap.get(new Key(dnId, isDomain));
+ if (sockStreamList == null) {
+ return null;
+ }
+
+ Iterator<Value> iter = sockStreamList.iterator();
+ while (iter.hasNext()) {
+ Value candidate = iter.next();
+ iter.remove();
+ long ageMs = Time.monotonicNow() - candidate.getTime();
+ Peer peer = candidate.getPeer();
+ if (ageMs >= expiryPeriod) {
+ try {
+ peer.close();
+ } catch (IOException e) {
+ LOG.warn("got IOException closing stale peer " + peer +
+ ", which is " + ageMs + " ms old");
+ }
+ } else if (!peer.isClosed()) {
+ return peer;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Give an unused socket to the cache.
+ */
+ public void put(DatanodeID dnId, Peer peer) {
+ Preconditions.checkNotNull(dnId);
+ Preconditions.checkNotNull(peer);
+ if (peer.isClosed()) return;
+ if (capacity <= 0) {
+ // Cache disabled.
+ IOUtilsClient.cleanup(LOG, peer);
+ return;
+ }
+ putInternal(dnId, peer);
+ }
+
+ private synchronized void putInternal(DatanodeID dnId, Peer peer) {
+ startExpiryDaemon();
+
+ if (capacity == multimap.size()) {
+ evictOldest();
+ }
+ multimap.put(new Key(dnId, peer.getDomainSocket() != null),
+ new Value(peer, Time.monotonicNow()));
+ }
+
+ public synchronized int size() {
+ return multimap.size();
+ }
+
+ /**
+ * Evict and close sockets older than expiry period from the cache.
+ */
+ private synchronized void evictExpired(long expiryPeriod) {
+ while (multimap.size() != 0) {
+ Iterator<Entry<Key, Value>> iter =
+ multimap.entries().iterator();
+ Entry<Key, Value> entry = iter.next();
+ // if oldest socket expired, remove it
+ if (entry == null ||
+ Time.monotonicNow() - entry.getValue().getTime() <
+ expiryPeriod) {
+ break;
+ }
+ IOUtilsClient.cleanup(LOG, entry.getValue().getPeer());
+ iter.remove();
+ }
+ }
+
+ /**
+ * Evict the oldest entry in the cache.
+ */
+ private synchronized void evictOldest() {
+ // We can get the oldest element immediately, because of an interesting
+ // property of LinkedListMultimap: its iterator traverses entries in the
+ // order that they were added.
+ Iterator<Entry<Key, Value>> iter =
+ multimap.entries().iterator();
+ if (!iter.hasNext()) {
+ throw new IllegalStateException("Cannot evict from empty cache! " +
+ "capacity: " + capacity);
+ }
+ Entry<Key, Value> entry = iter.next();
+ IOUtilsClient.cleanup(LOG, entry.getValue().getPeer());
+ iter.remove();
+ }
+
+ /**
+ * Periodically check in the cache and expire the entries
+ * older than expiryPeriod minutes
+ */
+ private void run() throws InterruptedException {
+ for(long lastExpiryTime = Time.monotonicNow();
+ !Thread.interrupted();
+ Thread.sleep(expiryPeriod)) {
+ final long elapsed = Time.monotonicNow() - lastExpiryTime;
+ if (elapsed >= expiryPeriod) {
+ evictExpired(expiryPeriod);
+ lastExpiryTime = Time.monotonicNow();
+ }
+ }
+ clear();
+ throw new InterruptedException("Daemon Interrupted");
+ }
+
+ /**
+ * Empty the cache, and close all sockets.
+ */
+ @VisibleForTesting
+ synchronized void clear() {
+ for (Value value : multimap.values()) {
+ IOUtilsClient.cleanup(LOG, value.getPeer());
+ }
+ multimap.clear();
+ }
+
+ @VisibleForTesting
+ void close() {
+ clear();
+ if (daemon != null) {
+ daemon.interrupt();
+ try {
+ daemon.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("failed to join thread");
+ }
+ }
+ daemon = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
new file mode 100644
index 0000000..07f4836
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Options that can be specified when manually triggering a block report.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class BlockReportOptions {
+ private final boolean incremental;
+
+ private BlockReportOptions(boolean incremental) {
+ this.incremental = incremental;
+ }
+
+ public boolean isIncremental() {
+ return incremental;
+ }
+
+ public static class Factory {
+ private boolean incremental = false;
+
+ public Factory() {
+ }
+
+ public Factory setIncremental(boolean incremental) {
+ this.incremental = incremental;
+ return this;
+ }
+
+ public BlockReportOptions build() {
+ return new BlockReportOptions(incremental);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BlockReportOptions{incremental=" + incremental + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 3b492ae..7b1e438 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -113,6 +113,11 @@ public interface HdfsClientConfigKeys {
"dfs.datanode.hdfs-blocks-metadata.enabled";
boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
+ String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal";
+ String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
+ long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
+ String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
+
String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
PREFIX + "replica.accessor.builder.classes";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
new file mode 100644
index 0000000..69fa52d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/BlockLocalPathInfo.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A block and the full path information to the block data file and
+ * the metadata file stored on the local file system.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockLocalPathInfo {
+ private final ExtendedBlock block;
+ private String localBlockPath = ""; // local file storing the data
+ private String localMetaPath = ""; // local file storing the checksum
+
+ /**
+ * Constructs BlockLocalPathInfo.
+ * @param b The block corresponding to this lock path info.
+ * @param file Block data file.
+ * @param metafile Metadata file for the block.
+ */
+ public BlockLocalPathInfo(ExtendedBlock b, String file, String metafile) {
+ block = b;
+ localBlockPath = file;
+ localMetaPath = metafile;
+ }
+
+ /**
+ * Get the Block data file.
+ * @return Block data file.
+ */
+ public String getBlockPath() {return localBlockPath;}
+
+ /**
+ * @return the Block
+ */
+ public ExtendedBlock getBlock() { return block;}
+
+ /**
+ * Get the Block metadata file.
+ * @return Block metadata file.
+ */
+ public String getMetaPath() {return localMetaPath;}
+
+ /**
+ * Get number of bytes in the block.
+ * @return Number of bytes in the block.
+ */
+ public long getNumBytes() {
+ return block.getNumBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
new file mode 100644
index 0000000..3374868
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.hdfs.client.BlockReportOptions;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenInfo;
+
+/** An client-datanode protocol for block recovery
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@KerberosInfo(
+ serverPrincipal = HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
+@TokenInfo(BlockTokenSelector.class)
+public interface ClientDatanodeProtocol {
+ /**
+ * Until version 9, this class ClientDatanodeProtocol served as both
+ * the client interface to the DN AND the RPC protocol used to
+ * communicate with the NN.
+ *
+ * This class is used by both the DFSClient and the
+ * DN server side to insulate from the protocol serialization.
+ *
+ * If you are adding/changing DN's interface then you need to
+ * change both this class and ALSO related protocol buffer
+ * wire protocol definition in ClientDatanodeProtocol.proto.
+ *
+ * For more details on protocol buffer wire protocol, please see
+ * .../org/apache/hadoop/hdfs/protocolPB/overview.html
+ *
+ * The log of historical changes can be retrieved from the svn).
+ * 9: Added deleteBlockPool method
+ *
+ * 9 is the last version id when this class was used for protocols
+ * serialization. DO not update this version any further.
+ */
+ public static final long versionID = 9L;
+
+ /** Return the visible length of a replica. */
+ long getReplicaVisibleLength(ExtendedBlock b) throws IOException;
+
+ /**
+ * Refresh the list of federated namenodes from updated configuration
+ * Adds new namenodes and stops the deleted namenodes.
+ *
+ * @throws IOException on error
+ **/
+ void refreshNamenodes() throws IOException;
+
+ /**
+ * Delete the block pool directory. If force is false it is deleted only if
+ * it is empty, otherwise it is deleted along with its contents.
+ *
+ * @param bpid Blockpool id to be deleted.
+ * @param force If false blockpool directory is deleted only if it is empty
+ * i.e. if it doesn't contain any block files, otherwise it is
+ * deleted along with its contents.
+ * @throws IOException
+ */
+ void deleteBlockPool(String bpid, boolean force) throws IOException;
+
+ /**
+ * Retrieves the path names of the block file and metadata file stored on the
+ * local file system.
+ *
+ * In order for this method to work, one of the following should be satisfied:
+ * <ul>
+ * <li>
+ * The client user must be configured at the datanode to be able to use this
+ * method.</li>
+ * <li>
+ * When security is enabled, kerberos authentication must be used to connect
+ * to the datanode.</li>
+ * </ul>
+ *
+ * @param block
+ * the specified block on the local datanode
+ * @param token
+ * the block access token.
+ * @return the BlockLocalPathInfo of a block
+ * @throws IOException
+ * on error
+ */
+ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+ Token<BlockTokenIdentifier> token) throws IOException;
+
+ /**
+ * Shuts down a datanode.
+ *
+ * @param forUpgrade If true, data node does extra prep work before shutting
+ * down. The work includes advising clients to wait and saving
+ * certain states for quick restart. This should only be used when
+ * the stored data will remain the same during upgrade/restart.
+ * @throws IOException
+ */
+ void shutdownDatanode(boolean forUpgrade) throws IOException;
+
+ /**
+ * Obtains datanode info
+ *
+ * @return software/config version and uptime of the datanode
+ */
+ DatanodeLocalInfo getDatanodeInfo() throws IOException;
+
+ /**
+ * Asynchronously reload configuration on disk and apply changes.
+ */
+ void startReconfiguration() throws IOException;
+
+ /**
+ * Get the status of the previously issued reconfig task.
+ * @see {@link org.apache.hadoop.conf.ReconfigurationTaskStatus}.
+ */
+ ReconfigurationTaskStatus getReconfigurationStatus() throws IOException;
+
+ /**
+ * Get a list of allowed properties for reconfiguration.
+ */
+ List<String> listReconfigurableProperties() throws IOException;
+
+ /**
+ * Trigger a new block report.
+ */
+ void triggerBlockReport(BlockReportOptions options)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
new file mode 100644
index 0000000..170467e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/InvalidEncryptionKeyException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Encryption key verification failed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class InvalidEncryptionKeyException extends IOException {
+ private static final long serialVersionUID = 0l;
+
+ public InvalidEncryptionKeyException() {
+ super();
+ }
+
+ public InvalidEncryptionKeyException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
new file mode 100644
index 0000000..7e3f66b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+
+@KerberosInfo(
+ serverPrincipal = HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
+@TokenInfo(BlockTokenSelector.class)
+@ProtocolInfo(protocolName =
+ "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Private
+public interface ClientDatanodeProtocolPB extends
+ ClientDatanodeProtocolService.BlockingInterface {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
new file mode 100644
index 0000000..311fcea
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationTaskStatus;
+import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.client.BlockReportOptions;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ListReconfigurablePropertiesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReconfigurationStatusConfigChangeProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ShutdownDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.StartReconfigurationRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.TriggerBlockReportRequestProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the client side translator to translate the requests made on
+ * {@link ClientDatanodeProtocol} interfaces to the RPC server implementing
+ * {@link ClientDatanodeProtocolPB}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ClientDatanodeProtocolTranslatorPB implements
+ ProtocolMetaInterface, ClientDatanodeProtocol,
+ ProtocolTranslator, Closeable {
+ public static final Logger LOG = LoggerFactory
+ .getLogger(ClientDatanodeProtocolTranslatorPB.class);
+
+ /** RpcController is not used and hence is set to null */
+ private final static RpcController NULL_CONTROLLER = null;
+ private final ClientDatanodeProtocolPB rpcProxy;
+ private final static RefreshNamenodesRequestProto VOID_REFRESH_NAMENODES =
+ RefreshNamenodesRequestProto.newBuilder().build();
+ private final static GetDatanodeInfoRequestProto VOID_GET_DATANODE_INFO =
+ GetDatanodeInfoRequestProto.newBuilder().build();
+ private final static GetReconfigurationStatusRequestProto VOID_GET_RECONFIG_STATUS =
+ GetReconfigurationStatusRequestProto.newBuilder().build();
+ private final static StartReconfigurationRequestProto VOID_START_RECONFIG =
+ StartReconfigurationRequestProto.newBuilder().build();
+ private static final ListReconfigurablePropertiesRequestProto
+ VOID_LIST_RECONFIGURABLE_PROPERTIES =
+ ListReconfigurablePropertiesRequestProto.newBuilder().build();
+
+ public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
+ Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
+ LocatedBlock locatedBlock) throws IOException {
+ rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf,
+ socketTimeout, connectToDnViaHostname, locatedBlock);
+ }
+
+ public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
+ UserGroupInformation ticket, Configuration conf, SocketFactory factory)
+ throws IOException {
+ rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory, 0);
+ }
+
+ /**
+ * Constructor.
+ * @param datanodeid Datanode to connect to.
+ * @param conf Configuration.
+ * @param socketTimeout Socket timeout to use.
+ * @param connectToDnViaHostname connect to the Datanode using its hostname
+ * @throws IOException
+ */
+ public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
+ Configuration conf, int socketTimeout, boolean connectToDnViaHostname)
+ throws IOException {
+ final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
+ }
+ rpcProxy = createClientDatanodeProtocolProxy(addr,
+ UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+ }
+
+ static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
+ DatanodeID datanodeid, Configuration conf, int socketTimeout,
+ boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
+ final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
+ }
+
+ // Since we're creating a new UserGroupInformation here, we know that no
+ // future RPC proxies will be able to re-use the same connection. And
+ // usages of this proxy tend to be one-off calls.
+ //
+ // This is a temporary fix: callers should really achieve this by using
+ // RPC.stopProxy() on the resulting object, but this is currently not
+ // working in trunk. See the discussion on HDFS-1965.
+ Configuration confWithNoIpcIdle = new Configuration(conf);
+ confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
+ .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
+
+ UserGroupInformation ticket = UserGroupInformation
+ .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
+ ticket.addToken(locatedBlock.getBlockToken());
+ return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle,
+ NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+ }
+
+ static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int socketTimeout) throws IOException {
+ RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ return RPC.getProxy(ClientDatanodeProtocolPB.class,
+ RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), addr, ticket,
+ conf, factory, socketTimeout);
+ }
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public long getReplicaVisibleLength(ExtendedBlock b) throws IOException {
+ GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto
+ .newBuilder().setBlock(PBHelperClient.convert(b)).build();
+ try {
+ return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void refreshNamenodes() throws IOException {
+ try {
+ rpcProxy.refreshNamenodes(NULL_CONTROLLER, VOID_REFRESH_NAMENODES);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void deleteBlockPool(String bpid, boolean force) throws IOException {
+ DeleteBlockPoolRequestProto req = DeleteBlockPoolRequestProto.newBuilder()
+ .setBlockPool(bpid).setForce(force).build();
+ try {
+ rpcProxy.deleteBlockPool(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
+ Token<BlockTokenIdentifier> token) throws IOException {
+ GetBlockLocalPathInfoRequestProto req =
+ GetBlockLocalPathInfoRequestProto.newBuilder()
+ .setBlock(PBHelperClient.convert(block))
+ .setToken(PBHelperClient.convert(token)).build();
+ GetBlockLocalPathInfoResponseProto resp;
+ try {
+ resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ return new BlockLocalPathInfo(PBHelperClient.convert(resp.getBlock()),
+ resp.getLocalPath(), resp.getLocalMetaPath());
+ }
+
+ @Override
+ public boolean isMethodSupported(String methodName) throws IOException {
+ return RpcClientUtil.isMethodSupported(rpcProxy,
+ ClientDatanodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+ RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName);
+ }
+
+ @Override
+ public Object getUnderlyingProxyObject() {
+ return rpcProxy;
+ }
+
+ @Override
+ public void shutdownDatanode(boolean forUpgrade) throws IOException {
+ ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto
+ .newBuilder().setForUpgrade(forUpgrade).build();
+ try {
+ rpcProxy.shutdownDatanode(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public DatanodeLocalInfo getDatanodeInfo() throws IOException {
+ GetDatanodeInfoResponseProto response;
+ try {
+ response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER, VOID_GET_DATANODE_INFO);
+ return PBHelperClient.convert(response.getLocalInfo());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void startReconfiguration() throws IOException {
+ try {
+ rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public ReconfigurationTaskStatus getReconfigurationStatus() throws IOException {
+ GetReconfigurationStatusResponseProto response;
+ Map<PropertyChange, Optional<String>> statusMap = null;
+ long startTime;
+ long endTime = 0;
+ try {
+ response = rpcProxy.getReconfigurationStatus(NULL_CONTROLLER,
+ VOID_GET_RECONFIG_STATUS);
+ startTime = response.getStartTime();
+ if (response.hasEndTime()) {
+ endTime = response.getEndTime();
+ }
+ if (response.getChangesCount() > 0) {
+ statusMap = Maps.newHashMap();
+ for (GetReconfigurationStatusConfigChangeProto change :
+ response.getChangesList()) {
+ PropertyChange pc = new PropertyChange(
+ change.getName(), change.getNewValue(), change.getOldValue());
+ String errorMessage = null;
+ if (change.hasErrorMessage()) {
+ errorMessage = change.getErrorMessage();
+ }
+ statusMap.put(pc, Optional.fromNullable(errorMessage));
+ }
+ }
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ return new ReconfigurationTaskStatus(startTime, endTime, statusMap);
+ }
+
+ @Override
+ public List<String> listReconfigurableProperties()
+ throws IOException {
+ ListReconfigurablePropertiesResponseProto response;
+ try {
+ response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
+ VOID_LIST_RECONFIGURABLE_PROPERTIES);
+ return response.getNameList();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void triggerBlockReport(BlockReportOptions options)
+ throws IOException {
+ try {
+ rpcProxy.triggerBlockReport(NULL_CONTROLLER,
+ TriggerBlockReportRequestProto.newBuilder().
+ setIncremental(options.isIncremental()).
+ build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index edf658a..d921507 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -23,12 +23,14 @@ import com.google.protobuf.CodedInputStream;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
@@ -185,6 +187,17 @@ public class PBHelperClient {
return pinnings;
}
+ public static ExtendedBlock convert(ExtendedBlockProto eb) {
+ if (eb == null) return null;
+ return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(),
+ eb.getGenerationStamp());
+ }
+
+ public static DatanodeLocalInfo convert(DatanodeLocalInfoProto proto) {
+ return new DatanodeLocalInfo(proto.getSoftwareVersion(),
+ proto.getConfigVersion(), proto.getUptime());
+ }
+
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
if (di == null) return null;
return convert(di);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
new file mode 100644
index 0000000..f67ca00
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSelector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.security.token.block;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * A block token selector for HDFS
+ */
+@InterfaceAudience.Private
+public class BlockTokenSelector implements TokenSelector<BlockTokenIdentifier> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Token<BlockTokenIdentifier> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens) {
+ if (service == null) {
+ return null;
+ }
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (BlockTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+ return (Token<BlockTokenIdentifier>) token;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9f77e85..a561909 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -858,6 +858,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
classes from BlockManager. (Mingliang Liu via wheat9)
+ HDFS-8925. Move BlockReaderLocal to hdfs-client.
+ (Mingliang Liu via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
deleted file mode 100644
index aa3e8ba..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.util.EnumSet;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-
-/**
- * A BlockReader is responsible for reading a single block
- * from a single datanode.
- */
-@InterfaceAudience.Private
-public interface BlockReader extends ByteBufferReadable {
-
-
- /* same interface as inputStream java.io.InputStream#read()
- * used by DFSInputStream#read()
- * This violates one rule when there is a checksum error:
- * "Read should not modify user buffer before successful read"
- * because it first reads the data to user buffer and then checks
- * the checksum.
- * Note: this must return -1 on EOF, even in the case of a 0-byte read.
- * See HDFS-5762 for details.
- */
- int read(byte[] buf, int off, int len) throws IOException;
-
- /**
- * Skip the given number of bytes
- */
- long skip(long n) throws IOException;
-
- /**
- * Returns an estimate of the number of bytes that can be read
- * (or skipped over) from this input stream without performing
- * network I/O.
- * This may return more than what is actually present in the block.
- */
- int available() throws IOException;
-
- /**
- * Close the block reader.
- *
- * @throws IOException
- */
- void close() throws IOException;
-
- /**
- * Read exactly the given amount of data, throwing an exception
- * if EOF is reached before that amount
- */
- void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException;
-
- /**
- * Similar to {@link #readFully(byte[], int, int)} except that it will
- * not throw an exception on EOF. However, it differs from the simple
- * {@link #read(byte[], int, int)} call in that it is guaranteed to
- * read the data if it is available. In other words, if this call
- * does not throw an exception, then either the buffer has been
- * filled or the next call will return EOF.
- */
- int readAll(byte[] buf, int offset, int len) throws IOException;
-
- /**
- * @return true only if this is a local read.
- */
- boolean isLocal();
-
- /**
- * @return true only if this is a short-circuit read.
- * All short-circuit reads are also local.
- */
- boolean isShortCircuit();
-
- /**
- * Get a ClientMmap object for this BlockReader.
- *
- * @param opts The read options to use.
- * @return The ClientMmap object, or null if mmap is not
- * supported.
- */
- ClientMmap getClientMmap(EnumSet<ReadOption> opts);
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2c9b288/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
deleted file mode 100644
index d913f3a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ /dev/null
@@ -1,741 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.EnumSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ReadOption;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
-import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DirectBufferPool;
-import org.apache.htrace.Sampler;
-import org.apache.htrace.Trace;
-import org.apache.htrace.TraceScope;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-/**
- * BlockReaderLocal enables local short circuited reads. If the DFS client is on
- * the same machine as the datanode, then the client can read files directly
- * from the local file system rather than going through the datanode for better
- * performance. <br>
- * {@link BlockReaderLocal} works as follows:
- * <ul>
- * <li>The client performing short circuit reads must be configured at the
- * datanode.</li>
- * <li>The client gets the file descriptors for the metadata file and the data
- * file for the block using
- * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
- * </li>
- * <li>The client reads the file descriptors.</li>
- * </ul>
- */
-@InterfaceAudience.Private
-class BlockReaderLocal implements BlockReader {
- static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
-
- private static final DirectBufferPool bufferPool = new DirectBufferPool();
-
- public static class Builder {
- private final int bufferSize;
- private boolean verifyChecksum;
- private int maxReadahead;
- private String filename;
- private ShortCircuitReplica replica;
- private long dataPos;
- private ExtendedBlock block;
- private StorageType storageType;
-
- public Builder(ShortCircuitConf conf) {
- this.maxReadahead = Integer.MAX_VALUE;
- this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
- this.bufferSize = conf.getShortCircuitBufferSize();
- }
-
- public Builder setVerifyChecksum(boolean verifyChecksum) {
- this.verifyChecksum = verifyChecksum;
- return this;
- }
-
- public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
- long readahead = cachingStrategy.getReadahead() != null ?
- cachingStrategy.getReadahead() :
- DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
- this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
- return this;
- }
-
- public Builder setFilename(String filename) {
- this.filename = filename;
- return this;
- }
-
- public Builder setShortCircuitReplica(ShortCircuitReplica replica) {
- this.replica = replica;
- return this;
- }
-
- public Builder setStartOffset(long startOffset) {
- this.dataPos = Math.max(0, startOffset);
- return this;
- }
-
- public Builder setBlock(ExtendedBlock block) {
- this.block = block;
- return this;
- }
-
- public Builder setStorageType(StorageType storageType) {
- this.storageType = storageType;
- return this;
- }
-
- public BlockReaderLocal build() {
- Preconditions.checkNotNull(replica);
- return new BlockReaderLocal(this);
- }
- }
-
- private boolean closed = false;
-
- /**
- * Pair of streams for this block.
- */
- private final ShortCircuitReplica replica;
-
- /**
- * The data FileChannel.
- */
- private final FileChannel dataIn;
-
- /**
- * The next place we'll read from in the block data FileChannel.
- *
- * If data is buffered in dataBuf, this offset will be larger than the
- * offset of the next byte which a read() operation will give us.
- */
- private long dataPos;
-
- /**
- * The Checksum FileChannel.
- */
- private final FileChannel checksumIn;
-
- /**
- * Checksum type and size.
- */
- private final DataChecksum checksum;
-
- /**
- * If false, we will always skip the checksum.
- */
- private final boolean verifyChecksum;
-
- /**
- * Name of the block, for logging purposes.
- */
- private final String filename;
-
- /**
- * Block ID and Block Pool ID.
- */
- private final ExtendedBlock block;
-
- /**
- * Cache of Checksum#bytesPerChecksum.
- */
- private final int bytesPerChecksum;
-
- /**
- * Cache of Checksum#checksumSize.
- */
- private final int checksumSize;
-
- /**
- * Maximum number of chunks to allocate.
- *
- * This is used to allocate dataBuf and checksumBuf, in the event that
- * we need them.
- */
- private final int maxAllocatedChunks;
-
- /**
- * True if zero readahead was requested.
- */
- private final boolean zeroReadaheadRequested;
-
- /**
- * Maximum amount of readahead we'll do. This will always be at least the,
- * size of a single chunk, even if {@link #zeroReadaheadRequested} is true.
- * The reason is because we need to do a certain amount of buffering in order
- * to do checksumming.
- *
- * This determines how many bytes we'll use out of dataBuf and checksumBuf.
- * Why do we allocate buffers, and then (potentially) only use part of them?
- * The rationale is that allocating a lot of buffers of different sizes would
- * make it very difficult for the DirectBufferPool to re-use buffers.
- */
- private final int maxReadaheadLength;
-
- /**
- * Buffers data starting at the current dataPos and extending on
- * for dataBuf.limit().
- *
- * This may be null if we don't need it.
- */
- private ByteBuffer dataBuf;
-
- /**
- * Buffers checksums starting at the current checksumPos and extending on
- * for checksumBuf.limit().
- *
- * This may be null if we don't need it.
- */
- private ByteBuffer checksumBuf;
-
- /**
- * StorageType of replica on DataNode.
- */
- private StorageType storageType;
-
- private BlockReaderLocal(Builder builder) {
- this.replica = builder.replica;
- this.dataIn = replica.getDataStream().getChannel();
- this.dataPos = builder.dataPos;
- this.checksumIn = replica.getMetaStream().getChannel();
- BlockMetadataHeader header = builder.replica.getMetaHeader();
- this.checksum = header.getChecksum();
- this.verifyChecksum = builder.verifyChecksum &&
- (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
- this.filename = builder.filename;
- this.block = builder.block;
- this.bytesPerChecksum = checksum.getBytesPerChecksum();
- this.checksumSize = checksum.getChecksumSize();
-
- this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
- ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
- // Calculate the effective maximum readahead.
- // We can't do more readahead than there is space in the buffer.
- int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
- ((Math.min(builder.bufferSize, builder.maxReadahead) +
- bytesPerChecksum - 1) / bytesPerChecksum);
- if (maxReadaheadChunks == 0) {
- this.zeroReadaheadRequested = true;
- maxReadaheadChunks = 1;
- } else {
- this.zeroReadaheadRequested = false;
- }
- this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
- this.storageType = builder.storageType;
- }
-
- private synchronized void createDataBufIfNeeded() {
- if (dataBuf == null) {
- dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
- dataBuf.position(0);
- dataBuf.limit(0);
- }
- }
-
- private synchronized void freeDataBufIfExists() {
- if (dataBuf != null) {
- // When disposing of a dataBuf, we have to move our stored file index
- // backwards.
- dataPos -= dataBuf.remaining();
- dataBuf.clear();
- bufferPool.returnBuffer(dataBuf);
- dataBuf = null;
- }
- }
-
- private synchronized void createChecksumBufIfNeeded() {
- if (checksumBuf == null) {
- checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
- checksumBuf.position(0);
- checksumBuf.limit(0);
- }
- }
-
- private synchronized void freeChecksumBufIfExists() {
- if (checksumBuf != null) {
- checksumBuf.clear();
- bufferPool.returnBuffer(checksumBuf);
- checksumBuf = null;
- }
- }
-
- private synchronized int drainDataBuf(ByteBuffer buf) {
- if (dataBuf == null) return -1;
- int oldLimit = dataBuf.limit();
- int nRead = Math.min(dataBuf.remaining(), buf.remaining());
- if (nRead == 0) {
- return (dataBuf.remaining() == 0) ? -1 : 0;
- }
- try {
- dataBuf.limit(dataBuf.position() + nRead);
- buf.put(dataBuf);
- } finally {
- dataBuf.limit(oldLimit);
- }
- return nRead;
- }
-
- /**
- * Read from the block file into a buffer.
- *
- * This function overwrites checksumBuf. It will increment dataPos.
- *
- * @param buf The buffer to read into. May be dataBuf.
- * The position and limit of this buffer should be set to
- * multiples of the checksum size.
- * @param canSkipChecksum True if we can skip checksumming.
- *
- * @return Total bytes read. 0 on EOF.
- */
- private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
- throws IOException {
- TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
- block.getBlockId() + ")", Sampler.NEVER);
- try {
- int total = 0;
- long startDataPos = dataPos;
- int startBufPos = buf.position();
- while (buf.hasRemaining()) {
- int nRead = dataIn.read(buf, dataPos);
- if (nRead < 0) {
- break;
- }
- dataPos += nRead;
- total += nRead;
- }
- if (canSkipChecksum) {
- freeChecksumBufIfExists();
- return total;
- }
- if (total > 0) {
- try {
- buf.limit(buf.position());
- buf.position(startBufPos);
- createChecksumBufIfNeeded();
- int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
- checksumBuf.clear();
- checksumBuf.limit(checksumsNeeded * checksumSize);
- long checksumPos = BlockMetadataHeader.getHeaderSize()
- + ((startDataPos / bytesPerChecksum) * checksumSize);
- while (checksumBuf.hasRemaining()) {
- int nRead = checksumIn.read(checksumBuf, checksumPos);
- if (nRead < 0) {
- throw new IOException("Got unexpected checksum file EOF at " +
- checksumPos + ", block file position " + startDataPos + " for " +
- "block " + block + " of file " + filename);
- }
- checksumPos += nRead;
- }
- checksumBuf.flip();
-
- checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
- } finally {
- buf.position(buf.limit());
- }
- }
- return total;
- } finally {
- scope.close();
- }
- }
-
- private boolean createNoChecksumContext() {
- if (verifyChecksum) {
- if (storageType != null && storageType.isTransient()) {
- // Checksums are not stored for replicas on transient storage. We do not
- // anchor, because we do not intend for client activity to block eviction
- // from transient storage on the DataNode side.
- return true;
- } else {
- return replica.addNoChecksumAnchor();
- }
- } else {
- return true;
- }
- }
-
- private void releaseNoChecksumContext() {
- if (verifyChecksum) {
- if (storageType == null || !storageType.isTransient()) {
- replica.removeNoChecksumAnchor();
- }
- }
- }
-
- @Override
- public synchronized int read(ByteBuffer buf) throws IOException {
- boolean canSkipChecksum = createNoChecksumContext();
- try {
- String traceString = null;
- if (LOG.isTraceEnabled()) {
- traceString = new StringBuilder().
- append("read(").
- append("buf.remaining=").append(buf.remaining()).
- append(", block=").append(block).
- append(", filename=").append(filename).
- append(", canSkipChecksum=").append(canSkipChecksum).
- append(")").toString();
- LOG.info(traceString + ": starting");
- }
- int nRead;
- try {
- if (canSkipChecksum && zeroReadaheadRequested) {
- nRead = readWithoutBounceBuffer(buf);
- } else {
- nRead = readWithBounceBuffer(buf, canSkipChecksum);
- }
- } catch (IOException e) {
- if (LOG.isTraceEnabled()) {
- LOG.info(traceString + ": I/O error", e);
- }
- throw e;
- }
- if (LOG.isTraceEnabled()) {
- LOG.info(traceString + ": returning " + nRead);
- }
- return nRead;
- } finally {
- if (canSkipChecksum) releaseNoChecksumContext();
- }
- }
-
- private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
- throws IOException {
- freeDataBufIfExists();
- freeChecksumBufIfExists();
- int total = 0;
- while (buf.hasRemaining()) {
- int nRead = dataIn.read(buf, dataPos);
- if (nRead <= 0) break;
- dataPos += nRead;
- total += nRead;
- }
- return (total == 0 && (dataPos == dataIn.size())) ? -1 : total;
- }
-
- /**
- * Fill the data buffer. If necessary, validate the data against the
- * checksums.
- *
- * We always want the offsets of the data contained in dataBuf to be
- * aligned to the chunk boundary. If we are validating checksums, we
- * accomplish this by seeking backwards in the file until we're on a
- * chunk boundary. (This is necessary because we can't checksum a
- * partial chunk.) If we are not validating checksums, we simply only
- * fill the latter part of dataBuf.
- *
- * @param canSkipChecksum true if we can skip checksumming.
- * @return true if we hit EOF.
- * @throws IOException
- */
- private synchronized boolean fillDataBuf(boolean canSkipChecksum)
- throws IOException {
- createDataBufIfNeeded();
- final int slop = (int)(dataPos % bytesPerChecksum);
- final long oldDataPos = dataPos;
- dataBuf.limit(maxReadaheadLength);
- if (canSkipChecksum) {
- dataBuf.position(slop);
- fillBuffer(dataBuf, canSkipChecksum);
- } else {
- dataPos -= slop;
- dataBuf.position(0);
- fillBuffer(dataBuf, canSkipChecksum);
- }
- dataBuf.limit(dataBuf.position());
- dataBuf.position(Math.min(dataBuf.position(), slop));
- if (LOG.isTraceEnabled()) {
- LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
- "buffer from offset " + oldDataPos + " of " + block);
- }
- return dataBuf.limit() != maxReadaheadLength;
- }
-
- /**
- * Read using the bounce buffer.
- *
- * A 'direct' read actually has three phases. The first drains any
- * remaining bytes from the slow read buffer. After this the read is
- * guaranteed to be on a checksum chunk boundary. If there are still bytes
- * to read, the fast direct path is used for as many remaining bytes as
- * possible, up to a multiple of the checksum chunk size. Finally, any
- * 'odd' bytes remaining at the end of the read cause another slow read to
- * be issued, which involves an extra copy.
- *
- * Every 'slow' read tries to fill the slow read buffer in one go for
- * efficiency's sake. As described above, all non-checksum-chunk-aligned
- * reads will be served from the slower read path.
- *
- * @param buf The buffer to read into.
- * @param canSkipChecksum True if we can skip checksums.
- */
- private synchronized int readWithBounceBuffer(ByteBuffer buf,
- boolean canSkipChecksum) throws IOException {
- int total = 0;
- int bb = drainDataBuf(buf); // drain bounce buffer if possible
- if (bb >= 0) {
- total += bb;
- if (buf.remaining() == 0) return total;
- }
- boolean eof = true, done = false;
- do {
- if (buf.isDirect() && (buf.remaining() >= maxReadaheadLength)
- && ((dataPos % bytesPerChecksum) == 0)) {
- // Fast lane: try to read directly into user-supplied buffer, bypassing
- // bounce buffer.
- int oldLimit = buf.limit();
- int nRead;
- try {
- buf.limit(buf.position() + maxReadaheadLength);
- nRead = fillBuffer(buf, canSkipChecksum);
- } finally {
- buf.limit(oldLimit);
- }
- if (nRead < maxReadaheadLength) {
- done = true;
- }
- if (nRead > 0) {
- eof = false;
- }
- total += nRead;
- } else {
- // Slow lane: refill bounce buffer.
- if (fillDataBuf(canSkipChecksum)) {
- done = true;
- }
- bb = drainDataBuf(buf); // drain bounce buffer if possible
- if (bb >= 0) {
- eof = false;
- total += bb;
- }
- }
- } while ((!done) && (buf.remaining() > 0));
- return (eof && total == 0) ? -1 : total;
- }
-
- @Override
- public synchronized int read(byte[] arr, int off, int len)
- throws IOException {
- boolean canSkipChecksum = createNoChecksumContext();
- int nRead;
- try {
- String traceString = null;
- if (LOG.isTraceEnabled()) {
- traceString = new StringBuilder().
- append("read(arr.length=").append(arr.length).
- append(", off=").append(off).
- append(", len=").append(len).
- append(", filename=").append(filename).
- append(", block=").append(block).
- append(", canSkipChecksum=").append(canSkipChecksum).
- append(")").toString();
- LOG.trace(traceString + ": starting");
- }
- try {
- if (canSkipChecksum && zeroReadaheadRequested) {
- nRead = readWithoutBounceBuffer(arr, off, len);
- } else {
- nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
- }
- } catch (IOException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(traceString + ": I/O error", e);
- }
- throw e;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace(traceString + ": returning " + nRead);
- }
- } finally {
- if (canSkipChecksum) releaseNoChecksumContext();
- }
- return nRead;
- }
-
- private synchronized int readWithoutBounceBuffer(byte arr[], int off,
- int len) throws IOException {
- freeDataBufIfExists();
- freeChecksumBufIfExists();
- int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
- if (nRead > 0) {
- dataPos += nRead;
- } else if ((nRead == 0) && (dataPos == dataIn.size())) {
- return -1;
- }
- return nRead;
- }
-
- private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
- boolean canSkipChecksum) throws IOException {
- createDataBufIfNeeded();
- if (!dataBuf.hasRemaining()) {
- dataBuf.position(0);
- dataBuf.limit(maxReadaheadLength);
- fillDataBuf(canSkipChecksum);
- }
- if (dataBuf.remaining() == 0) return -1;
- int toRead = Math.min(dataBuf.remaining(), len);
- dataBuf.get(arr, off, toRead);
- return toRead;
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- int discardedFromBuf = 0;
- long remaining = n;
- if ((dataBuf != null) && dataBuf.hasRemaining()) {
- discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
- dataBuf.position(dataBuf.position() + discardedFromBuf);
- remaining -= discardedFromBuf;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" +
- filename + "): discarded " + discardedFromBuf + " bytes from " +
- "dataBuf and advanced dataPos by " + remaining);
- }
- dataPos += remaining;
- return n;
- }
-
- @Override
- public int available() throws IOException {
- // We never do network I/O in BlockReaderLocal.
- return Integer.MAX_VALUE;
- }
-
- @Override
- public synchronized void close() throws IOException {
- if (closed) return;
- closed = true;
- if (LOG.isTraceEnabled()) {
- LOG.trace("close(filename=" + filename + ", block=" + block + ")");
- }
- replica.unref();
- freeDataBufIfExists();
- freeChecksumBufIfExists();
- }
-
- @Override
- public synchronized void readFully(byte[] arr, int off, int len)
- throws IOException {
- BlockReaderUtil.readFully(this, arr, off, len);
- }
-
- @Override
- public synchronized int readAll(byte[] buf, int off, int len)
- throws IOException {
- return BlockReaderUtil.readAll(this, buf, off, len);
- }
-
- @Override
- public boolean isLocal() {
- return true;
- }
-
- @Override
- public boolean isShortCircuit() {
- return true;
- }
-
- /**
- * Get or create a memory map for this replica.
- *
- * There are two kinds of ClientMmap objects we could fetch here: one that
- * will always read pre-checksummed data, and one that may read data that
- * hasn't been checksummed.
- *
- * If we fetch the former, "safe" kind of ClientMmap, we have to increment
- * the anchor count on the shared memory slot. This will tell the DataNode
- * not to munlock the block until this ClientMmap is closed.
- * If we fetch the latter, we don't bother with anchoring.
- *
- * @param opts The options to use, such as SKIP_CHECKSUMS.
- *
- * @return null on failure; the ClientMmap otherwise.
- */
- @Override
- public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
- boolean anchor = verifyChecksum &&
- (opts.contains(ReadOption.SKIP_CHECKSUMS) == false);
- if (anchor) {
- if (!createNoChecksumContext()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("can't get an mmap for " + block + " of " + filename +
- " since SKIP_CHECKSUMS was not given, " +
- "we aren't skipping checksums, and the block is not mlocked.");
- }
- return null;
- }
- }
- ClientMmap clientMmap = null;
- try {
- clientMmap = replica.getOrCreateClientMmap(anchor);
- } finally {
- if ((clientMmap == null) && anchor) {
- releaseNoChecksumContext();
- }
- }
- return clientMmap;
- }
-
- @VisibleForTesting
- boolean getVerifyChecksum() {
- return this.verifyChecksum;
- }
-
- @VisibleForTesting
- int getMaxReadaheadLength() {
- return this.maxReadaheadLength;
- }
-
- /**
- * Make the replica anchorable. Normally this can only be done by the
- * DataNode. This method is only for testing.
- */
- @VisibleForTesting
- void forceAnchorable() {
- replica.getSlot().makeAnchorable();
- }
-
- /**
- * Make the replica unanchorable. Normally this can only be done by the
- * DataNode. This method is only for testing.
- */
- @VisibleForTesting
- void forceUnanchorable() {
- replica.getSlot().makeUnanchorable();
- }
-}