You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2016/09/14 02:44:08 UTC
[30/50] [abbrv] hadoop git commit: HDFS-10636. Modify ReplicaInfo to
remove the assumption that replica metadata and data are stored in
java.io.File. (Virajith Jalaparti via lei)
HDFS-10636. Modify ReplicaInfo to remove the assumption that replica metadata and data are stored in java.io.File. (Virajith Jalaparti via lei)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86c9862b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86c9862b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86c9862b
Branch: refs/heads/YARN-2915
Commit: 86c9862bec0248d671e657aa56094a2919b8ac14
Parents: 1c0d18f
Author: Lei Xu <le...@apache.org>
Authored: Tue Sep 13 12:53:37 2016 -0700
Committer: Lei Xu <le...@apache.org>
Committed: Tue Sep 13 12:54:14 2016 -0700
----------------------------------------------------------------------
.../server/datanode/BlockPoolSliceStorage.java | 16 +-
.../hdfs/server/datanode/BlockReceiver.java | 2 +-
.../hdfs/server/datanode/BlockSender.java | 7 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../server/datanode/DataNodeFaultInjector.java | 2 +-
.../hdfs/server/datanode/DataStorage.java | 4 +-
.../hdfs/server/datanode/DirectoryScanner.java | 10 +-
.../hdfs/server/datanode/FinalizedReplica.java | 27 +-
.../hdfs/server/datanode/LocalReplica.java | 479 ++++++++++
.../server/datanode/LocalReplicaInPipeline.java | 417 +++++++++
.../server/datanode/ReplicaBeingWritten.java | 16 +-
.../hdfs/server/datanode/ReplicaBuilder.java | 252 +++++
.../hdfs/server/datanode/ReplicaHandler.java | 6 +-
.../hdfs/server/datanode/ReplicaInPipeline.java | 324 ++-----
.../datanode/ReplicaInPipelineInterface.java | 86 --
.../hdfs/server/datanode/ReplicaInfo.java | 370 ++++----
.../server/datanode/ReplicaUnderRecovery.java | 30 +-
.../datanode/ReplicaWaitingToBeRecovered.java | 27 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 9 +-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 74 +-
.../impl/FsDatasetAsyncDiskService.java | 71 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 908 +++++++------------
.../datanode/fsdataset/impl/FsDatasetUtil.java | 18 +
.../datanode/fsdataset/impl/FsVolumeImpl.java | 154 +++-
.../datanode/fsdataset/impl/FsVolumeList.java | 2 +-
.../impl/RamDiskAsyncLazyPersistService.java | 34 +-
.../TestClientProtocolForPipelineRecovery.java | 4 +-
.../apache/hadoop/hdfs/TestCrcCorruption.java | 6 +-
.../server/datanode/SimulatedFSDataset.java | 30 +-
.../datanode/TestBlockPoolSliceStorage.java | 6 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 2 +-
.../datanode/TestDataNodeRollingUpgrade.java | 6 +-
.../server/datanode/TestDirectoryScanner.java | 17 +-
.../server/datanode/TestSimulatedFSDataset.java | 2 +-
.../hdfs/server/datanode/TestTransferRbw.java | 4 +-
.../extdataset/ExternalDatasetImpl.java | 6 +-
.../extdataset/ExternalReplicaInPipeline.java | 26 +-
.../extdataset/TestExternalDataset.java | 4 +-
.../fsdataset/impl/FsDatasetImplTestUtils.java | 43 +-
.../fsdataset/impl/FsDatasetTestUtil.java | 20 +-
.../fsdataset/impl/TestWriteToReplica.java | 4 +-
41 files changed, 2219 insertions(+), 1308 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index fd90ae9..fd89611 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -741,7 +742,20 @@ public class BlockPoolSliceStorage extends Storage {
*
* @return the trash directory for a given block file that is being deleted.
*/
- public String getTrashDirectory(File blockFile) {
+ public String getTrashDirectory(ReplicaInfo info) {
+
+ URI blockURI = info.getBlockURI();
+ try{
+ File blockFile = new File(blockURI);
+ return getTrashDirectory(blockFile);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Failed to get block file for replica " + info, e);
+ }
+
+ return null;
+ }
+
+ private String getTrashDirectory(File blockFile) {
if (isTrashAllowed(blockFile)) {
Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 522d577..39419c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -121,7 +121,7 @@ class BlockReceiver implements Closeable {
/** the block to receive */
private final ExtendedBlock block;
/** the replica to write */
- private ReplicaInPipelineInterface replicaInfo;
+ private ReplicaInPipeline replicaInfo;
/** pipeline stage */
private final BlockConstructionStage stage;
private final boolean isTransfer;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 9d9502b..c3ba2eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -248,8 +249,8 @@ class BlockSender implements java.io.Closeable {
}
// if there is a write in progress
ChunkChecksum chunkChecksum = null;
- if (replica instanceof ReplicaBeingWritten) {
- final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
+ if (replica.getState() == ReplicaState.RBW) {
+ final ReplicaInPipeline rbw = (ReplicaInPipeline) replica;
waitForMinLength(rbw, startOffset + length);
chunkChecksum = rbw.getLastChecksumAndDataLen();
}
@@ -473,7 +474,7 @@ class BlockSender implements java.io.Closeable {
* @param len minimum length to reach
* @throws IOException on failing to reach the len in given wait time
*/
- private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
+ private static void waitForMinLength(ReplicaInPipeline rbw, long len)
throws IOException {
// Wait for 3 seconds for rbw replica to reach the minimum length
for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 0025040..09ecac1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -3474,4 +3474,4 @@ public class DataNode extends ReconfigurableBase
void setBlockScanner(BlockScanner blockScanner) {
this.blockScanner = blockScanner;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 931c124..aa06aa1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -56,6 +56,6 @@ public class DataNodeFaultInjector {
public void failMirrorConnection() throws IOException { }
- public void failPipeline(ReplicaInPipelineInterface replicaInfo,
+ public void failPipeline(ReplicaInPipeline replicaInfo,
String mirrorAddr) throws IOException { }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 0e6b339..7e620c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -204,9 +204,9 @@ public class DataStorage extends Storage {
* @return trash directory if rolling upgrade is in progress, null
* otherwise.
*/
- public String getTrashDirectoryForBlockFile(String bpid, File blockFile) {
+ public String getTrashDirectoryForReplica(String bpid, ReplicaInfo info) {
if (trashEnabledBpids.contains(bpid)) {
- return getBPStorage(bpid).getTrashDirectory(blockFile);
+ return getBPStorage(bpid).getTrashDirectory(info);
}
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index f9ebab9..c50bfaf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -597,14 +597,14 @@ public class DirectoryScanner implements Runnable {
diffs.put(bpid, diffRecord);
statsRecord.totalBlocks = blockpoolReport.length;
- List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
- FinalizedReplica[] memReport = bl.toArray(new FinalizedReplica[bl.size()]);
+ List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
+ ReplicaInfo[] memReport = bl.toArray(new ReplicaInfo[bl.size()]);
Arrays.sort(memReport); // Sort based on blockId
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < memReport.length && d < blockpoolReport.length) {
- FinalizedReplica memBlock = memReport[m];
+ ReplicaInfo memBlock = memReport[m];
ScanInfo info = blockpoolReport[d];
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
@@ -633,7 +633,7 @@ public class DirectoryScanner implements Runnable {
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
- } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
+ } else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files don't match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
@@ -652,7 +652,7 @@ public class DirectoryScanner implements Runnable {
}
}
while (m < memReport.length) {
- FinalizedReplica current = memReport[m++];
+ ReplicaInfo current = memReport[m++];
addDifference(diffRecord, statsRecord,
current.getBlockId(), current.getVolume());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
index 8daeb51..81a4ab4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
@@ -22,11 +22,12 @@ import java.io.File;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
/**
* This class describes a replica that has been finalized.
*/
-public class FinalizedReplica extends ReplicaInfo {
+public class FinalizedReplica extends LocalReplica {
/**
* Constructor
@@ -88,4 +89,28 @@ public class FinalizedReplica extends ReplicaInfo {
public String toString() {
return super.toString();
}
+
+ @Override
+ public ReplicaInfo getOriginalReplica() {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support getOriginalReplica");
+ }
+
+ @Override
+ public long getRecoveryID() {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support getRecoveryID");
+ }
+
+ @Override
+ public void setRecoveryID(long recoveryId) {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support setRecoveryID");
+ }
+
+ @Override
+ public ReplicaRecoveryInfo createInfo() {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support createInfo");
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
new file mode 100644
index 0000000..cbfc9a5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for all replicas which are on local storage media
+ * and hence, are backed by files.
+ */
+abstract public class LocalReplica extends ReplicaInfo {
+
+ /**
+ * Base directory containing numerically-identified sub directories and
+ * possibly blocks.
+ */
+ private File baseDir;
+
+ /**
+ * Whether or not this replica's parent directory includes subdirs, in which
+ * case we can generate them based on the replica's block ID
+ */
+ private boolean hasSubdirs;
+
+ private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
+
+ static final Log LOG = LogFactory.getLog(LocalReplica.class);
+ private final static boolean IS_NATIVE_IO_AVAIL;
+ static {
+ IS_NATIVE_IO_AVAIL = NativeIO.isAvailable();
+ if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) {
+ LOG.warn("Data node cannot fully support concurrent reading"
+ + " and writing without native code extensions on Windows.");
+ }
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ LocalReplica(Block block, FsVolumeSpi vol, File dir) {
+ this(block.getBlockId(), block.getNumBytes(),
+ block.getGenerationStamp(), vol, dir);
+ }
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ */
+ LocalReplica(long blockId, long len, long genStamp,
+ FsVolumeSpi vol, File dir) {
+ super(vol, blockId, len, genStamp);
+ setDirInternal(dir);
+ }
+
+ /**
+ * Copy constructor.
+ * @param from the source replica
+ */
+ LocalReplica(LocalReplica from) {
+ this(from, from.getVolume(), from.getDir());
+ }
+
+ /**
+ * Get the full path of this replica's data file.
+ * @return the full path of this replica's data file
+ */
+ @VisibleForTesting
+ public File getBlockFile() {
+ return new File(getDir(), getBlockName());
+ }
+
+ /**
+ * Get the full path of this replica's meta file.
+ * @return the full path of this replica's meta file
+ */
+ @VisibleForTesting
+ public File getMetaFile() {
+ return new File(getDir(),
+ DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp()));
+ }
+
+ /**
+ * Return the parent directory path where this replica is located.
+ * @return the parent directory path where this replica is located
+ */
+ protected File getDir() {
+ return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+ getBlockId()) : baseDir;
+ }
+
+ /**
+ * Set the parent directory where this replica is located.
+ * @param dir the parent directory where the replica is located
+ */
+ private void setDirInternal(File dir) {
+ if (dir == null) {
+ baseDir = null;
+ return;
+ }
+
+ ReplicaDirInfo dirInfo = parseBaseDir(dir);
+ this.hasSubdirs = dirInfo.hasSubidrs;
+
+ synchronized (internedBaseDirs) {
+ if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
+ // Create a new String path of this file and make a brand new File object
+ // to guarantee we drop the reference to the underlying char[] storage.
+ File baseDir = new File(dirInfo.baseDirPath);
+ internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
+ }
+ this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
+ }
+ }
+
+ @VisibleForTesting
+ public static class ReplicaDirInfo {
+ public String baseDirPath;
+ public boolean hasSubidrs;
+
+ public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+ this.baseDirPath = baseDirPath;
+ this.hasSubidrs = hasSubidrs;
+ }
+ }
+
+ @VisibleForTesting
+ public static ReplicaDirInfo parseBaseDir(File dir) {
+
+ File currentDir = dir;
+ boolean hasSubdirs = false;
+ while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
+ hasSubdirs = true;
+ currentDir = currentDir.getParentFile();
+ }
+
+ return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
+ }
+
+ /**
+ * Copy specified file into a temporary file. Then rename the
+ * temporary file to the original name. This will cause any
+ * hardlinks to the original file to be removed. The temporary
+ * files are created in the same directory. The temporary files will
+ * be recovered (especially on Windows) on datanode restart.
+ */
+ private void breakHardlinks(File file, Block b) throws IOException {
+ File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
+ try (FileInputStream in = new FileInputStream(file)) {
+ try (FileOutputStream out = new FileOutputStream(tmpFile)){
+ IOUtils.copyBytes(in, out, 16 * 1024);
+ }
+ if (file.length() != tmpFile.length()) {
+ throw new IOException("Copy of file " + file + " size " + file.length()+
+ " into file " + tmpFile +
+ " resulted in a size of " + tmpFile.length());
+ }
+ FileUtil.replaceFile(tmpFile, file);
+ } catch (IOException e) {
+ boolean done = tmpFile.delete();
+ if (!done) {
+ DataNode.LOG.info("detachFile failed to delete temporary file " +
+ tmpFile);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * This function "breaks hardlinks" to the current replica file.
+ *
+ * When doing a DataNode upgrade, we create a bunch of hardlinks to each block
+ * file. This cleverly ensures that both the old and the new storage
+ * directories can contain the same block file, without using additional space
+ * for the data.
+ *
+ * However, when we want to append to the replica file, we need to "break" the
+ * hardlink to ensure that the old snapshot continues to contain the old data
+ * length. If we failed to do that, we could roll back to the previous/
+ * directory during a downgrade, and find that the block contents were longer
+ * than they were at the time of upgrade.
+ *
+ * @return true only if data was copied.
+ * @throws IOException
+ */
+ public boolean breakHardLinksIfNeeded() throws IOException {
+ File file = getBlockFile();
+ if (file == null || getVolume() == null) {
+ throw new IOException("detachBlock:Block not found. " + this);
+ }
+ File meta = getMetaFile();
+
+ int linkCount = HardLink.getLinkCount(file);
+ if (linkCount > 1) {
+ DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
+ "block " + this);
+ breakHardlinks(file, this);
+ }
+ if (HardLink.getLinkCount(meta) > 1) {
+ breakHardlinks(meta, this);
+ }
+ return true;
+ }
+
+ @Override
+ public URI getBlockURI() {
+ return getBlockFile().toURI();
+ }
+
+ @Override
+ public InputStream getDataInputStream(long seekOffset) throws IOException {
+
+ File blockFile = getBlockFile();
+ if (IS_NATIVE_IO_AVAIL) {
+ return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+ } else {
+ try {
+ return FsDatasetUtil.openAndSeek(blockFile, seekOffset);
+ } catch (FileNotFoundException fnfe) {
+ throw new IOException("Block " + this + " is not valid. " +
+ "Expected block file at " + blockFile + " does not exist.");
+ }
+ }
+ }
+
+ @Override
+ public OutputStream getDataOutputStream(boolean append) throws IOException {
+ return new FileOutputStream(getBlockFile(), append);
+ }
+
+ @Override
+ public boolean blockDataExists() {
+ return getBlockFile().exists();
+ }
+
+ @Override
+ public boolean deleteBlockData() {
+ return getBlockFile().delete();
+ }
+
+ @Override
+ public long getBlockDataLength() {
+ return getBlockFile().length();
+ }
+
+ @Override
+ public URI getMetadataURI() {
+ return getMetaFile().toURI();
+ }
+
+ @Override
+ public LengthInputStream getMetadataInputStream(long offset)
+ throws IOException {
+ File meta = getMetaFile();
+ return new LengthInputStream(
+ FsDatasetUtil.openAndSeek(meta, offset), meta.length());
+ }
+
+ @Override
+ public OutputStream getMetadataOutputStream(boolean append)
+ throws IOException {
+ return new FileOutputStream(getMetaFile(), append);
+ }
+
+ @Override
+ public boolean metadataExists() {
+ return getMetaFile().exists();
+ }
+
+ @Override
+ public boolean deleteMetadata() {
+ return getMetaFile().delete();
+ }
+
+ @Override
+ public long getMetadataLength() {
+ return getMetaFile().length();
+ }
+
+ @Override
+ public boolean renameMeta(URI destURI) throws IOException {
+ return renameFile(getMetaFile(), new File(destURI));
+ }
+
+ @Override
+ public boolean renameData(URI destURI) throws IOException {
+ return renameFile(getBlockFile(), new File(destURI));
+ }
+
+ private boolean renameFile(File srcfile, File destfile) throws IOException {
+ try {
+ NativeIO.renameTo(srcfile, destfile);
+ return true;
+ } catch (IOException e) {
+ throw new IOException("Failed to move block file for " + this
+ + " from " + srcfile + " to " + destfile.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public void updateWithReplica(StorageLocation replicaLocation) {
+ // for local replicas, the replica location is assumed to be a file.
+ File diskFile = replicaLocation.getFile();
+ if (null == diskFile) {
+ setDirInternal(null);
+ } else {
+ setDirInternal(diskFile.getParentFile());
+ }
+ }
+
+ @Override
+ public boolean getPinning(LocalFileSystem localFS) throws IOException {
+ FileStatus fss =
+ localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath()));
+ return fss.getPermission().getStickyBit();
+ }
+
+ @Override
+ public void setPinning(LocalFileSystem localFS) throws IOException {
+ File f = getBlockFile();
+ Path p = new Path(f.getAbsolutePath());
+
+ FsPermission oldPermission = localFS.getFileStatus(
+ new Path(f.getAbsolutePath())).getPermission();
+ //sticky bit is used for pinning purpose
+ FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+ oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+ localFS.setPermission(p, permission);
+ }
+
+ @Override
+ public void bumpReplicaGS(long newGS) throws IOException {
+ long oldGS = getGenerationStamp();
+ File oldmeta = getMetaFile();
+ setGenerationStamp(newGS);
+ File newmeta = getMetaFile();
+
+ // rename meta file to new GS
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ }
+ try {
+ // calling renameMeta on the ReplicaInfo doesn't work here
+ NativeIO.renameTo(oldmeta, newmeta);
+ } catch (IOException e) {
+ setGenerationStamp(oldGS); // restore old GS
+ throw new IOException("Block " + this + " reopen failed. " +
+ " Unable to move meta file " + oldmeta +
+ " to " + newmeta, e);
+ }
+ }
+
+ @Override
+ public void truncateBlock(long newLength) throws IOException {
+ truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength);
+ }
+
+ @Override
+ public int compareWith(ScanInfo info) {
+ return info.getBlockFile().compareTo(getBlockFile());
+ }
+
+ static public void truncateBlock(File blockFile, File metaFile,
+ long oldlen, long newlen) throws IOException {
+ LOG.info("truncateBlock: blockFile=" + blockFile
+ + ", metaFile=" + metaFile
+ + ", oldlen=" + oldlen
+ + ", newlen=" + newlen);
+
+ if (newlen == oldlen) {
+ return;
+ }
+ if (newlen > oldlen) {
+ throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
+ + ") to newlen (=" + newlen + ")");
+ }
+
+ DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+ int checksumsize = dcs.getChecksumSize();
+ int bpc = dcs.getBytesPerChecksum();
+ long n = (newlen - 1)/bpc + 1;
+ long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
+ long lastchunkoffset = (n - 1)*bpc;
+ int lastchunksize = (int)(newlen - lastchunkoffset);
+ byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
+
+ RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+ try {
+ //truncate blockFile
+ blockRAF.setLength(newlen);
+
+ //read last chunk
+ blockRAF.seek(lastchunkoffset);
+ blockRAF.readFully(b, 0, lastchunksize);
+ } finally {
+ blockRAF.close();
+ }
+
+ //compute checksum
+ dcs.update(b, 0, lastchunksize);
+ dcs.writeValue(b, 0, false);
+
+ //update metaFile
+ RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+ try {
+ metaRAF.setLength(newmetalen);
+ metaRAF.seek(newmetalen - checksumsize);
+ metaRAF.write(b, 0, checksumsize);
+ } finally {
+ metaRAF.close();
+ }
+ }
+
+ @Override
+ public void copyMetadata(URI destination) throws IOException {
+ //for local replicas, we assume the destination URI is file
+ Storage.nativeCopyFileUnbuffered(getMetaFile(),
+ new File(destination), true);
+ }
+
+ @Override
+ public void copyBlockdata(URI destination) throws IOException {
+ //for local replicas, we assume the destination URI is file
+ Storage.nativeCopyFileUnbuffered(getBlockFile(),
+ new File(destination), true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
new file mode 100644
index 0000000..bc7bc6d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class defines a replica in a pipeline, which
+ * includes a persistent replica being written to by a dfs client or
+ * a temporary replica being replicated by a source datanode or
+ * being copied for the balancing purpose.
+ *
+ * The base class implements a temporary replica
+ */
+public class LocalReplicaInPipeline extends LocalReplica
+ implements ReplicaInPipeline {
+ private long bytesAcked;
+ private long bytesOnDisk;
+ private byte[] lastChecksum;
+ private AtomicReference<Thread> writer = new AtomicReference<Thread>();
+
+ /**
+ * Bytes reserved for this replica on the containing volume.
+ * Based off difference between the estimated maximum block length and
+ * the bytes already written to this block.
+ */
+ private long bytesReserved;
+ private final long originalBytesReserved;
+
+ /**
+ * Constructor for a zero length replica.
+ * @param blockId block id
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param bytesToReserve disk space to reserve for this replica, based on
+ * the estimated maximum block length.
+ */
+ public LocalReplicaInPipeline(long blockId, long genStamp,
+ FsVolumeSpi vol, File dir, long bytesToReserve) {
+ this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(),
+ bytesToReserve);
+ }
+
+ /**
+ * Constructor
+ * @param block a block
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param writer a thread that is writing to this replica
+ */
+ LocalReplicaInPipeline(Block block,
+ FsVolumeSpi vol, File dir, Thread writer) {
+ this(block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
+ vol, dir, writer, 0L);
+ }
+
+ /**
+ * Constructor
+ * @param blockId block id
+ * @param len replica length
+ * @param genStamp replica generation stamp
+ * @param vol volume where replica is located
+ * @param dir directory path where block and meta files are located
+ * @param writer a thread that is writing to this replica
+ * @param bytesToReserve disk space to reserve for this replica, based on
+ * the estimated maximum block length.
+ */
+ LocalReplicaInPipeline(long blockId, long len, long genStamp,
+ FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
+ super(blockId, len, genStamp, vol, dir);
+ this.bytesAcked = len;
+ this.bytesOnDisk = len;
+ this.writer.set(writer);
+ this.bytesReserved = bytesToReserve;
+ this.originalBytesReserved = bytesToReserve;
+ }
+
+ /**
+ * Copy constructor.
+ * @param from where to copy from
+ */
+ public LocalReplicaInPipeline(LocalReplicaInPipeline from) {
+ super(from);
+ this.bytesAcked = from.getBytesAcked();
+ this.bytesOnDisk = from.getBytesOnDisk();
+ this.writer.set(from.writer.get());
+ this.bytesReserved = from.bytesReserved;
+ this.originalBytesReserved = from.originalBytesReserved;
+ }
+
+ @Override
+ public long getVisibleLength() {
+ return -1;
+ }
+
+ @Override //ReplicaInfo
+ public ReplicaState getState() {
+ return ReplicaState.TEMPORARY;
+ }
+
+ @Override // ReplicaInPipeline
+ public long getBytesAcked() {
+ return bytesAcked;
+ }
+
+ @Override // ReplicaInPipeline
+ public void setBytesAcked(long bytesAcked) {
+ long newBytesAcked = bytesAcked - this.bytesAcked;
+ this.bytesAcked = bytesAcked;
+
+ // Once bytes are ACK'ed we can release equivalent space from the
+ // volume's reservedForRbw count. We could have released it as soon
+ // as the write-to-disk completed but that would be inefficient.
+ getVolume().releaseReservedSpace(newBytesAcked);
+ bytesReserved -= newBytesAcked;
+ }
+
+ @Override // ReplicaInPipeline
+ public long getBytesOnDisk() {
+ return bytesOnDisk;
+ }
+
+ @Override
+ public long getBytesReserved() {
+ return bytesReserved;
+ }
+
+ @Override
+ public long getOriginalBytesReserved() {
+ return originalBytesReserved;
+ }
+
+ @Override // ReplicaInPipeline
+ public void releaseAllBytesReserved() {
+ getVolume().releaseReservedSpace(bytesReserved);
+ getVolume().releaseLockedMemory(bytesReserved);
+ bytesReserved = 0;
+ }
+
+ @Override // ReplicaInPipeline
+ public synchronized void setLastChecksumAndDataLen(long dataLength,
+ byte[] checksum) {
+ this.bytesOnDisk = dataLength;
+ this.lastChecksum = checksum;
+ }
+
+ @Override // ReplicaInPipeline
+ public synchronized ChunkChecksum getLastChecksumAndDataLen() {
+ return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
+ }
+
+ @Override // ReplicaInPipeline
+ public void setWriter(Thread writer) {
+ this.writer.set(writer);
+ }
+
+ @Override
+ public void interruptThread() {
+ Thread thread = writer.get();
+ if (thread != null && thread != Thread.currentThread()
+ && thread.isAlive()) {
+ thread.interrupt();
+ }
+ }
+
+ @Override // Object
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ /**
+ * Attempt to set the writer to a new value.
+ */
+ @Override // ReplicaInPipeline
+ public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
+ return writer.compareAndSet(prevWriter, newWriter);
+ }
+
+ /**
+ * Interrupt the writing thread and wait until it dies.
+ * @throws IOException the waiting is interrupted
+ */
+ @Override // ReplicaInPipeline
+ public void stopWriter(long xceiverStopTimeout) throws IOException {
+ while (true) {
+ Thread thread = writer.get();
+ if ((thread == null) || (thread == Thread.currentThread()) ||
+ (!thread.isAlive())) {
+ if (writer.compareAndSet(thread, null)) {
+ return; // Done
+ }
+ // The writer changed. Go back to the start of the loop and attempt to
+ // stop the new writer.
+ continue;
+ }
+ thread.interrupt();
+ try {
+ thread.join(xceiverStopTimeout);
+ if (thread.isAlive()) {
+ // Our thread join timed out.
+ final String msg = "Join on writer thread " + thread + " timed out";
+ DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
+ throw new IOException(msg);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Waiting for writer thread is interrupted.");
+ }
+ }
+ }
+
+ @Override // Object
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override // ReplicaInPipeline
+ public ReplicaOutputStreams createStreams(boolean isCreate,
+ DataChecksum requestedChecksum) throws IOException {
+ File blockFile = getBlockFile();
+ File metaFile = getMetaFile();
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("writeTo blockfile is " + blockFile +
+ " of size " + blockFile.length());
+ DataNode.LOG.debug("writeTo metafile is " + metaFile +
+ " of size " + metaFile.length());
+ }
+ long blockDiskSize = 0L;
+ long crcDiskSize = 0L;
+
+ // the checksum that should actually be used -- this
+ // may differ from requestedChecksum for appends.
+ final DataChecksum checksum;
+
+ RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+
+ if (!isCreate) {
+ // For append or recovery, we must enforce the existing checksum.
+ // Also, verify that the file has correct lengths, etc.
+ boolean checkedMeta = false;
+ try {
+ BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+ checksum = header.getChecksum();
+
+ if (checksum.getBytesPerChecksum() !=
+ requestedChecksum.getBytesPerChecksum()) {
+ throw new IOException("Client requested checksum " +
+ requestedChecksum + " when appending to an existing block " +
+ "with different chunk size: " + checksum);
+ }
+
+ int bytesPerChunk = checksum.getBytesPerChecksum();
+ int checksumSize = checksum.getChecksumSize();
+
+ blockDiskSize = bytesOnDisk;
+ crcDiskSize = BlockMetadataHeader.getHeaderSize() +
+ (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
+ if (blockDiskSize > 0 &&
+ (blockDiskSize > blockFile.length() ||
+ crcDiskSize>metaFile.length())) {
+ throw new IOException("Corrupted block: " + this);
+ }
+ checkedMeta = true;
+ } finally {
+ if (!checkedMeta) {
+ // clean up in case of exceptions.
+ IOUtils.closeStream(metaRAF);
+ }
+ }
+ } else {
+ // for create, we can use the requested checksum
+ checksum = requestedChecksum;
+ }
+
+ FileOutputStream blockOut = null;
+ FileOutputStream crcOut = null;
+ try {
+ blockOut = new FileOutputStream(
+ new RandomAccessFile(blockFile, "rw").getFD());
+ crcOut = new FileOutputStream(metaRAF.getFD());
+ if (!isCreate) {
+ blockOut.getChannel().position(blockDiskSize);
+ crcOut.getChannel().position(crcDiskSize);
+ }
+ return new ReplicaOutputStreams(blockOut, crcOut, checksum,
+ getVolume().isTransientStorage());
+ } catch (IOException e) {
+ IOUtils.closeStream(blockOut);
+ IOUtils.closeStream(metaRAF);
+ throw e;
+ }
+ }
+
+ @Override
+ public OutputStream createRestartMetaStream() throws IOException {
+ File blockFile = getBlockFile();
+ File restartMeta = new File(blockFile.getParent() +
+ File.pathSeparator + "." + blockFile.getName() + ".restart");
+ if (restartMeta.exists() && !restartMeta.delete()) {
+ DataNode.LOG.warn("Failed to delete restart meta file: " +
+ restartMeta.getPath());
+ }
+ return new FileOutputStream(restartMeta);
+ }
+
+ @Override
+ public String toString() {
+ return super.toString()
+ + "\n bytesAcked=" + bytesAcked
+ + "\n bytesOnDisk=" + bytesOnDisk;
+ }
+
+ @Override
+ public ReplicaInfo getOriginalReplica() {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support getOriginalReplica");
+ }
+
+ @Override
+ public long getRecoveryID() {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support getRecoveryID");
+ }
+
+ @Override
+ public void setRecoveryID(long recoveryId) {
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support setRecoveryID");
+ }
+
+ @Override
+ public ReplicaRecoveryInfo createInfo(){
+ throw new UnsupportedOperationException("Replica of type " + getState() +
+ " does not support createInfo");
+ }
+
+ public void moveReplicaFrom(ReplicaInfo oldReplicaInfo, File newBlkFile)
+ throws IOException {
+
+ if (!(oldReplicaInfo instanceof LocalReplica)) {
+ throw new IOException("The source replica with blk id "
+ + oldReplicaInfo.getBlockId()
+ + " should be derived from LocalReplica");
+ }
+
+ LocalReplica localReplica = (LocalReplica) oldReplicaInfo;
+
+ File oldmeta = localReplica.getMetaFile();
+ File newmeta = getMetaFile();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+ }
+ try {
+ NativeIO.renameTo(oldmeta, newmeta);
+ } catch (IOException e) {
+ throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
+ " Unable to move meta file " + oldmeta +
+ " to rbw dir " + newmeta, e);
+ }
+
+ File blkfile = localReplica.getBlockFile();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+ + ", file length=" + blkfile.length());
+ }
+ try {
+ NativeIO.renameTo(blkfile, newBlkFile);
+ } catch (IOException e) {
+ try {
+ NativeIO.renameTo(newmeta, oldmeta);
+ } catch (IOException ex) {
+ LOG.warn("Cannot move meta file " + newmeta +
+ "back to the finalized directory " + oldmeta, ex);
+ }
+ throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
+ " Unable to move block file " + blkfile +
+ " to rbw dir " + newBlkFile, e);
+ }
+ }
+
+ @Override // ReplicaInPipeline
+ public ReplicaInfo getReplicaInfo() {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
index 4a89493..262533e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
@@ -27,9 +27,9 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
* Those are the replicas that
* are created in a pipeline initiated by a dfs client.
*/
-public class ReplicaBeingWritten extends ReplicaInPipeline {
+public class ReplicaBeingWritten extends LocalReplicaInPipeline {
/**
- * Constructor for a zero length replica
+ * Constructor for a zero length replica.
* @param blockId block id
* @param genStamp replica generation stamp
* @param vol volume where replica is located
@@ -37,25 +37,25 @@ public class ReplicaBeingWritten extends ReplicaInPipeline {
* @param bytesToReserve disk space to reserve for this replica, based on
* the estimated maximum block length.
*/
- public ReplicaBeingWritten(long blockId, long genStamp,
+ public ReplicaBeingWritten(long blockId, long genStamp,
FsVolumeSpi vol, File dir, long bytesToReserve) {
super(blockId, genStamp, vol, dir, bytesToReserve);
}
-
+
/**
- * Constructor
+ * Constructor.
* @param block a block
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
* @param writer a thread that is writing to this replica
*/
- public ReplicaBeingWritten(Block block,
+ public ReplicaBeingWritten(Block block,
FsVolumeSpi vol, File dir, Thread writer) {
- super( block, vol, dir, writer);
+ super(block, vol, dir, writer);
}
/**
- * Constructor
+ * Constructor.
* @param blockId block id
* @param len replica length
* @param genStamp replica generation stamp
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
new file mode 100644
index 0000000..280aaa0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+
+/**
+ * This class is to be used as a builder for {@link ReplicaInfo} objects.
+ * The state of the replica is used to determine which object is instantiated.
+ */
+public class ReplicaBuilder {
+
+ private ReplicaState state;
+ private long blockId;
+ private long genStamp;
+ private long length;
+ private FsVolumeSpi volume;
+ private File directoryUsed;
+ private long bytesToReserve;
+ private Thread writer;
+ private long recoveryId;
+ private Block block;
+
+ private ReplicaInfo fromReplica;
+
+ public ReplicaBuilder(ReplicaState state) {
+ volume = null;
+ writer = null;
+ block = null;
+ length = -1;
+ this.state = state;
+ }
+
+ public ReplicaBuilder setState(ReplicaState state) {
+ this.state = state;
+ return this;
+ }
+
+ public ReplicaBuilder setBlockId(long blockId) {
+ this.blockId = blockId;
+ return this;
+ }
+
+ public ReplicaBuilder setGenerationStamp(long genStamp) {
+ this.genStamp = genStamp;
+ return this;
+ }
+
+ public ReplicaBuilder setLength(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public ReplicaBuilder setFsVolume(FsVolumeSpi volume) {
+ this.volume = volume;
+ return this;
+ }
+
+ public ReplicaBuilder setDirectoryToUse(File dir) {
+ this.directoryUsed = dir;
+ return this;
+ }
+
+ public ReplicaBuilder setBytesToReserve(long bytesToReserve) {
+ this.bytesToReserve = bytesToReserve;
+ return this;
+ }
+
+ public ReplicaBuilder setWriterThread(Thread writer) {
+ this.writer = writer;
+ return this;
+ }
+
+ public ReplicaBuilder from(ReplicaInfo fromReplica) {
+ this.fromReplica = fromReplica;
+ return this;
+ }
+
+ public ReplicaBuilder setRecoveryId(long recoveryId) {
+ this.recoveryId = recoveryId;
+ return this;
+ }
+
+ public ReplicaBuilder setBlock(Block block) {
+ this.block = block;
+ return this;
+ }
+
+ public LocalReplicaInPipeline buildLocalReplicaInPipeline()
+ throws IllegalArgumentException {
+ LocalReplicaInPipeline info = null;
+ switch(state) {
+ case RBW:
+ info = buildRBW();
+ break;
+ case TEMPORARY:
+ info = buildTemporaryReplica();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown replica state " + state);
+ }
+ return info;
+ }
+
+ private LocalReplicaInPipeline buildRBW() throws IllegalArgumentException {
+ if (null != fromReplica && fromReplica.getState() == ReplicaState.RBW) {
+ return new ReplicaBeingWritten((ReplicaBeingWritten) fromReplica);
+ } else if (null != fromReplica) {
+ throw new IllegalArgumentException("Incompatible fromReplica "
+ + "state: " + fromReplica.getState());
+ } else {
+ if (null != block) {
+ if (null == writer) {
+ throw new IllegalArgumentException("A valid writer is "
+ + "required for constructing a RBW from block "
+ + block.getBlockId());
+ }
+ return new ReplicaBeingWritten(block, volume, directoryUsed, writer);
+ } else {
+ if (length != -1) {
+ return new ReplicaBeingWritten(blockId, length, genStamp,
+ volume, directoryUsed, writer, bytesToReserve);
+ } else {
+ return new ReplicaBeingWritten(blockId, genStamp, volume,
+ directoryUsed, bytesToReserve);
+ }
+ }
+ }
+ }
+
+ private LocalReplicaInPipeline buildTemporaryReplica()
+ throws IllegalArgumentException {
+ if (null != fromReplica &&
+ fromReplica.getState() == ReplicaState.TEMPORARY) {
+ return new LocalReplicaInPipeline((LocalReplicaInPipeline) fromReplica);
+ } else if (null != fromReplica) {
+ throw new IllegalArgumentException("Incompatible fromReplica "
+ + "state: " + fromReplica.getState());
+ } else {
+ if (null != block) {
+ if (null == writer) {
+ throw new IllegalArgumentException("A valid writer is "
+ + "required for constructing a Replica from block "
+ + block.getBlockId());
+ }
+ return new LocalReplicaInPipeline(block, volume, directoryUsed,
+ writer);
+ } else {
+ if (length != -1) {
+ return new LocalReplicaInPipeline(blockId, length, genStamp,
+ volume, directoryUsed, writer, bytesToReserve);
+ } else {
+ return new LocalReplicaInPipeline(blockId, genStamp, volume,
+ directoryUsed, bytesToReserve);
+ }
+ }
+ }
+ }
+
+ private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException {
+ if (null != fromReplica &&
+ fromReplica.getState() == ReplicaState.FINALIZED) {
+ return new FinalizedReplica((FinalizedReplica)fromReplica);
+ } else if (null != this.fromReplica) {
+ throw new IllegalArgumentException("Incompatible fromReplica "
+ + "state: " + fromReplica.getState());
+ } else {
+ if (null != block) {
+ return new FinalizedReplica(block, volume, directoryUsed);
+ } else {
+ return new FinalizedReplica(blockId, length, genStamp, volume,
+ directoryUsed);
+ }
+ }
+ }
+
+ private ReplicaInfo buildRWR() throws IllegalArgumentException {
+
+ if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) {
+ return new ReplicaWaitingToBeRecovered(
+ (ReplicaWaitingToBeRecovered) fromReplica);
+ } else if (null != fromReplica){
+ throw new IllegalArgumentException("Incompatible fromReplica "
+ + "state: " + fromReplica.getState());
+ } else {
+ if (null != block) {
+ return new ReplicaWaitingToBeRecovered(block, volume, directoryUsed);
+ } else {
+ return new ReplicaWaitingToBeRecovered(blockId, length, genStamp,
+ volume, directoryUsed);
+ }
+ }
+ }
+
+ private ReplicaInfo buildRUR() throws IllegalArgumentException {
+ if (null == fromReplica) {
+ throw new IllegalArgumentException(
+ "Missing a valid replica to recover from");
+ }
+ if (null != writer || null != block) {
+ throw new IllegalArgumentException("Invalid state for "
+ + "recovering from replica with blk id "
+ + fromReplica.getBlockId());
+ }
+ if (fromReplica.getState() == ReplicaState.RUR) {
+ return new ReplicaUnderRecovery((ReplicaUnderRecovery) fromReplica);
+ } else {
+ return new ReplicaUnderRecovery(fromReplica, recoveryId);
+ }
+ }
+
+ public ReplicaInfo build() throws IllegalArgumentException {
+ ReplicaInfo info = null;
+ switch(this.state) {
+ case FINALIZED:
+ info = buildFinalizedReplica();
+ break;
+ case RWR:
+ info = buildRWR();
+ break;
+ case RUR:
+ info = buildRUR();
+ break;
+ case RBW:
+ case TEMPORARY:
+ info = buildLocalReplicaInPipeline();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown replica state " + state);
+ }
+ return info;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
index b563d7f..ddc9f9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
@@ -27,11 +27,11 @@ import java.io.IOException;
* the fs volume where this replica is located.
*/
public class ReplicaHandler implements Closeable {
- private final ReplicaInPipelineInterface replica;
+ private final ReplicaInPipeline replica;
private final FsVolumeReference volumeReference;
public ReplicaHandler(
- ReplicaInPipelineInterface replica, FsVolumeReference reference) {
+ ReplicaInPipeline replica, FsVolumeReference reference) {
this.replica = replica;
this.volumeReference = reference;
}
@@ -43,7 +43,7 @@ public class ReplicaHandler implements Closeable {
}
}
- public ReplicaInPipelineInterface getReplica() {
+ public ReplicaInPipeline getReplica() {
return replica;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 7326846..efa6ea6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -17,313 +17,91 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.RandomAccessFile;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.StringUtils;
/**
- * This class defines a replica in a pipeline, which
- * includes a persistent replica being written to by a dfs client or
- * a temporary replica being replicated by a source datanode or
- * being copied for the balancing purpose.
- *
- * The base class implements a temporary replica
+ * This defines the interface of a replica in Pipeline that's being written to
*/
-public class ReplicaInPipeline extends ReplicaInfo
- implements ReplicaInPipelineInterface {
- private long bytesAcked;
- private long bytesOnDisk;
- private byte[] lastChecksum;
- private AtomicReference<Thread> writer = new AtomicReference<Thread>();
-
+public interface ReplicaInPipeline extends Replica {
/**
- * Bytes reserved for this replica on the containing volume.
- * Based off difference between the estimated maximum block length and
- * the bytes already written to this block.
+ * Set the number of bytes received
+ * @param bytesReceived number of bytes received
*/
- private long bytesReserved;
- private final long originalBytesReserved;
+ void setNumBytes(long bytesReceived);
/**
- * Constructor for a zero length replica
- * @param blockId block id
- * @param genStamp replica generation stamp
- * @param vol volume where replica is located
- * @param dir directory path where block and meta files are located
- * @param bytesToReserve disk space to reserve for this replica, based on
- * the estimated maximum block length.
+ * Get the number of bytes acked
+ * @return the number of bytes acked
*/
- public ReplicaInPipeline(long blockId, long genStamp,
- FsVolumeSpi vol, File dir, long bytesToReserve) {
- this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve);
- }
+ long getBytesAcked();
/**
- * Constructor
- * @param block a block
- * @param vol volume where replica is located
- * @param dir directory path where block and meta files are located
- * @param writer a thread that is writing to this replica
+ * Set the number bytes that have acked
+ * @param bytesAcked number bytes acked
*/
- ReplicaInPipeline(Block block,
- FsVolumeSpi vol, File dir, Thread writer) {
- this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
- vol, dir, writer, 0L);
- }
+ void setBytesAcked(long bytesAcked);
/**
- * Constructor
- * @param blockId block id
- * @param len replica length
- * @param genStamp replica generation stamp
- * @param vol volume where replica is located
- * @param dir directory path where block and meta files are located
- * @param writer a thread that is writing to this replica
- * @param bytesToReserve disk space to reserve for this replica, based on
- * the estimated maximum block length.
+ * Release any disk space reserved for this replica.
*/
- ReplicaInPipeline(long blockId, long len, long genStamp,
- FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
- super( blockId, len, genStamp, vol, dir);
- this.bytesAcked = len;
- this.bytesOnDisk = len;
- this.writer.set(writer);
- this.bytesReserved = bytesToReserve;
- this.originalBytesReserved = bytesToReserve;
- }
+ public void releaseAllBytesReserved();
/**
- * Copy constructor.
- * @param from where to copy from
+ * store the checksum for the last chunk along with the data length
+ * @param dataLength number of bytes on disk
+ * @param lastChecksum - checksum bytes for the last chunk
*/
- public ReplicaInPipeline(ReplicaInPipeline from) {
- super(from);
- this.bytesAcked = from.getBytesAcked();
- this.bytesOnDisk = from.getBytesOnDisk();
- this.writer.set(from.writer.get());
- this.bytesReserved = from.bytesReserved;
- this.originalBytesReserved = from.originalBytesReserved;
- }
-
- @Override
- public long getVisibleLength() {
- return -1;
- }
-
- @Override //ReplicaInfo
- public ReplicaState getState() {
- return ReplicaState.TEMPORARY;
- }
+ public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum);
- @Override // ReplicaInPipelineInterface
- public long getBytesAcked() {
- return bytesAcked;
- }
+ /**
+ * gets the last chunk checksum and the length of the block corresponding
+ * to that checksum
+ */
+ public ChunkChecksum getLastChecksumAndDataLen();
- @Override // ReplicaInPipelineInterface
- public void setBytesAcked(long bytesAcked) {
- long newBytesAcked = bytesAcked - this.bytesAcked;
- this.bytesAcked = bytesAcked;
+ /**
+ * Create output streams for writing to this replica,
+ * one for block file and one for CRC file
+ *
+ * @param isCreate if it is for creation
+ * @param requestedChecksum the checksum the writer would prefer to use
+ * @return output streams for writing
+ * @throws IOException if any error occurs
+ */
+ public ReplicaOutputStreams createStreams(boolean isCreate,
+ DataChecksum requestedChecksum) throws IOException;
- // Once bytes are ACK'ed we can release equivalent space from the
- // volume's reservedForRbw count. We could have released it as soon
- // as the write-to-disk completed but that would be inefficient.
- getVolume().releaseReservedSpace(newBytesAcked);
- bytesReserved -= newBytesAcked;
- }
+ /**
+ * Create an output stream to write restart metadata in case of datanode
+ * shutting down for quick restart.
+ *
+ * @return output stream for writing.
+ * @throws IOException if any error occurs
+ */
+ public OutputStream createRestartMetaStream() throws IOException;
- @Override // ReplicaInPipelineInterface
- public long getBytesOnDisk() {
- return bytesOnDisk;
- }
-
- @Override
- public long getBytesReserved() {
- return bytesReserved;
- }
+ ReplicaInfo getReplicaInfo();
- @Override
- public long getOriginalBytesReserved() {
- return originalBytesReserved;
- }
-
- @Override
- public void releaseAllBytesReserved() { // ReplicaInPipelineInterface
- getVolume().releaseReservedSpace(bytesReserved);
- getVolume().releaseLockedMemory(bytesReserved);
- bytesReserved = 0;
- }
-
- @Override // ReplicaInPipelineInterface
- public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
- this.bytesOnDisk = dataLength;
- this.lastChecksum = lastChecksum;
- }
+ /**
+ * Set the thread that is writing to this replica
+ * @param writer a thread writing to this replica
+ */
+ void setWriter(Thread writer);
- @Override // ReplicaInPipelineInterface
- public synchronized ChunkChecksum getLastChecksumAndDataLen() {
- return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
- }
-
- public void interruptThread() {
- Thread thread = writer.get();
- if (thread != null && thread != Thread.currentThread()
- && thread.isAlive()) {
- thread.interrupt();
- }
- }
-
- @Override // Object
- public boolean equals(Object o) {
- return super.equals(o);
- }
+ void interruptThread();
/**
* Attempt to set the writer to a new value.
*/
- public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
- return writer.compareAndSet(prevWriter, newWriter);
- }
+ boolean attemptToSetWriter(Thread prevWriter, Thread newWriter);
/**
- * Interrupt the writing thread and wait until it dies
+ * Interrupt the writing thread and wait until it dies.
* @throws IOException the waiting is interrupted
*/
- public void stopWriter(long xceiverStopTimeout) throws IOException {
- while (true) {
- Thread thread = writer.get();
- if ((thread == null) || (thread == Thread.currentThread()) ||
- (!thread.isAlive())) {
- if (writer.compareAndSet(thread, null) == true) {
- return; // Done
- }
- // The writer changed. Go back to the start of the loop and attempt to
- // stop the new writer.
- continue;
- }
- thread.interrupt();
- try {
- thread.join(xceiverStopTimeout);
- if (thread.isAlive()) {
- // Our thread join timed out.
- final String msg = "Join on writer thread " + thread + " timed out";
- DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
- throw new IOException(msg);
- }
- } catch (InterruptedException e) {
- throw new IOException("Waiting for writer thread is interrupted.");
- }
- }
- }
-
- @Override // Object
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override // ReplicaInPipelineInterface
- public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum) throws IOException {
- File blockFile = getBlockFile();
- File metaFile = getMetaFile();
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("writeTo blockfile is " + blockFile +
- " of size " + blockFile.length());
- DataNode.LOG.debug("writeTo metafile is " + metaFile +
- " of size " + metaFile.length());
- }
- long blockDiskSize = 0L;
- long crcDiskSize = 0L;
-
- // the checksum that should actually be used -- this
- // may differ from requestedChecksum for appends.
- final DataChecksum checksum;
-
- RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
-
- if (!isCreate) {
- // For append or recovery, we must enforce the existing checksum.
- // Also, verify that the file has correct lengths, etc.
- boolean checkedMeta = false;
- try {
- BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
- checksum = header.getChecksum();
-
- if (checksum.getBytesPerChecksum() !=
- requestedChecksum.getBytesPerChecksum()) {
- throw new IOException("Client requested checksum " +
- requestedChecksum + " when appending to an existing block " +
- "with different chunk size: " + checksum);
- }
-
- int bytesPerChunk = checksum.getBytesPerChecksum();
- int checksumSize = checksum.getChecksumSize();
-
- blockDiskSize = bytesOnDisk;
- crcDiskSize = BlockMetadataHeader.getHeaderSize() +
- (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
- if (blockDiskSize>0 &&
- (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
- throw new IOException("Corrupted block: " + this);
- }
- checkedMeta = true;
- } finally {
- if (!checkedMeta) {
- // clean up in case of exceptions.
- IOUtils.closeStream(metaRAF);
- }
- }
- } else {
- // for create, we can use the requested checksum
- checksum = requestedChecksum;
- }
-
- FileOutputStream blockOut = null;
- FileOutputStream crcOut = null;
- try {
- blockOut = new FileOutputStream(
- new RandomAccessFile( blockFile, "rw" ).getFD() );
- crcOut = new FileOutputStream(metaRAF.getFD() );
- if (!isCreate) {
- blockOut.getChannel().position(blockDiskSize);
- crcOut.getChannel().position(crcDiskSize);
- }
- return new ReplicaOutputStreams(blockOut, crcOut, checksum,
- getVolume().isTransientStorage());
- } catch (IOException e) {
- IOUtils.closeStream(blockOut);
- IOUtils.closeStream(metaRAF);
- throw e;
- }
- }
-
- @Override
- public OutputStream createRestartMetaStream() throws IOException {
- File blockFile = getBlockFile();
- File restartMeta = new File(blockFile.getParent() +
- File.pathSeparator + "." + blockFile.getName() + ".restart");
- if (restartMeta.exists() && !restartMeta.delete()) {
- DataNode.LOG.warn("Failed to delete restart meta file: " +
- restartMeta.getPath());
- }
- return new FileOutputStream(restartMeta);
- }
-
- @Override
- public String toString() {
- return super.toString()
- + "\n bytesAcked=" + bytesAcked
- + "\n bytesOnDisk=" + bytesOnDisk;
- }
+ void stopWriter(long xceiverStopTimeout) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86c9862b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
deleted file mode 100644
index ef9f3e2..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.util.DataChecksum;
-
-/**
- * This defines the interface of a replica in Pipeline that's being written to
- */
-public interface ReplicaInPipelineInterface extends Replica {
- /**
- * Set the number of bytes received
- * @param bytesReceived number of bytes received
- */
- void setNumBytes(long bytesReceived);
-
- /**
- * Get the number of bytes acked
- * @return the number of bytes acked
- */
- long getBytesAcked();
-
- /**
- * Set the number bytes that have acked
- * @param bytesAcked number bytes acked
- */
- void setBytesAcked(long bytesAcked);
-
- /**
- * Release any disk space reserved for this replica.
- */
- public void releaseAllBytesReserved();
-
- /**
- * store the checksum for the last chunk along with the data length
- * @param dataLength number of bytes on disk
- * @param lastChecksum - checksum bytes for the last chunk
- */
- public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum);
-
- /**
- * gets the last chunk checksum and the length of the block corresponding
- * to that checksum
- */
- public ChunkChecksum getLastChecksumAndDataLen();
-
- /**
- * Create output streams for writing to this replica,
- * one for block file and one for CRC file
- *
- * @param isCreate if it is for creation
- * @param requestedChecksum the checksum the writer would prefer to use
- * @return output streams for writing
- * @throws IOException if any error occurs
- */
- public ReplicaOutputStreams createStreams(boolean isCreate,
- DataChecksum requestedChecksum) throws IOException;
-
- /**
- * Create an output stream to write restart metadata in case of datanode
- * shutting down for quick restart.
- *
- * @return output stream for writing.
- * @throws IOException if any error occurs
- */
- public OutputStream createRestartMetaStream() throws IOException;
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org