You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/12/19 00:29:05 UTC
svn commit: r1552162 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/test/java/org/apache/hadoop/hdfs/
Author: cmccabe
Date: Wed Dec 18 23:29:05 2013
New Revision: 1552162
URL: http://svn.apache.org/r1552162
Log:
HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Dec 18 23:29:05 2013
@@ -241,9 +241,6 @@ Trunk (Unreleased)
HDFS-5431. Support cachepool-based limit management in path-based caching
(awang via cmccabe)
- HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
- (cmccabe)
-
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
@@ -754,6 +751,9 @@ Release 2.4.0 - UNRELEASED
FSEditLogOp; change FSEditLogOpCodes.fromByte(..) to be more efficient; and
change Some fields in FSEditLog to final. (szetszwo)
+ HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
+ (cmccabe)
+
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -809,6 +809,8 @@ Release 2.4.0 - UNRELEASED
HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion.
(Binglin Chang via junping_du)
+ HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Dec 18 23:29:05 2013
@@ -228,7 +228,7 @@ implements ByteBufferReadable, CanSetDro
dfsClient.getConf().shortCircuitStreamsCacheSize,
dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
this.cachingStrategy =
- dfsClient.getDefaultReadCachingStrategy().duplicate();
+ dfsClient.getDefaultReadCachingStrategy();
openInfo();
}
@@ -574,7 +574,7 @@ implements ByteBufferReadable, CanSetDro
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
- buffersize, verifyChecksum, dfsClient.clientName);
+ buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy);
if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr +
" for " + blk);
@@ -928,7 +928,11 @@ implements ByteBufferReadable, CanSetDro
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
- block = getBlockAt(block.getStartOffset(), false);
+ CachingStrategy curCachingStrategy;
+ synchronized (this) {
+ block = getBlockAt(block.getStartOffset(), false);
+ curCachingStrategy = cachingStrategy;
+ }
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
@@ -940,7 +944,7 @@ implements ByteBufferReadable, CanSetDro
int len = (int) (end - start + 1);
reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
blockToken, start, len, buffersize, verifyChecksum,
- dfsClient.clientName);
+ dfsClient.clientName, curCachingStrategy);
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
@@ -1053,6 +1057,7 @@ implements ByteBufferReadable, CanSetDro
* @param bufferSize The IO buffer size (not the client buffer size)
* @param verifyChecksum Whether to verify checksum
* @param clientName Client name
+ * @param CachingStrategy caching strategy to use
* @return New BlockReader instance
*/
protected BlockReader getBlockReader(InetSocketAddress dnAddr,
@@ -1064,7 +1069,8 @@ implements ByteBufferReadable, CanSetDro
long len,
int bufferSize,
boolean verifyChecksum,
- String clientName)
+ String clientName,
+ CachingStrategy curCachingStrategy)
throws IOException {
// Firstly, we check to see if we have cached any file descriptors for
// local blocks. If so, we can just re-use those file descriptors.
@@ -1084,7 +1090,7 @@ implements ByteBufferReadable, CanSetDro
setBlockMetadataHeader(BlockMetadataHeader.
preadHeader(fis[1].getChannel())).
setFileInputStreamCache(fileInputStreamCache).
- setCachingStrategy(cachingStrategy).
+ setCachingStrategy(curCachingStrategy).
build();
}
@@ -1119,7 +1125,7 @@ implements ByteBufferReadable, CanSetDro
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache,
- allowShortCircuitLocalReads, cachingStrategy);
+ allowShortCircuitLocalReads, curCachingStrategy);
return reader;
} catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@@ -1142,7 +1148,7 @@ implements ByteBufferReadable, CanSetDro
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache,
- allowShortCircuitLocalReads, cachingStrategy);
+ allowShortCircuitLocalReads, curCachingStrategy);
return reader;
} catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e);
@@ -1166,7 +1172,7 @@ implements ByteBufferReadable, CanSetDro
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false,
- cachingStrategy);
+ curCachingStrategy);
return reader;
} catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@@ -1186,7 +1192,7 @@ implements ByteBufferReadable, CanSetDro
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false,
- cachingStrategy);
+ curCachingStrategy);
}
@@ -1460,14 +1466,18 @@ implements ByteBufferReadable, CanSetDro
@Override
public synchronized void setReadahead(Long readahead)
throws IOException {
- this.cachingStrategy.setReadahead(readahead);
+ this.cachingStrategy =
+ new CachingStrategy.Builder(this.cachingStrategy).
+ setReadahead(readahead).build();
closeCurrentBlockReader();
}
@Override
public synchronized void setDropBehind(Boolean dropBehind)
throws IOException {
- this.cachingStrategy.setDropBehind(dropBehind);
+ this.cachingStrategy =
+ new CachingStrategy.Builder(this.cachingStrategy).
+ setDropBehind(dropBehind).build();
closeCurrentBlockReader();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Dec 18 23:29:05 2013
@@ -150,7 +150,7 @@ public class DFSOutputStream extends FSO
private Progressable progress;
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
- private CachingStrategy cachingStrategy;
+ private AtomicReference<CachingStrategy> cachingStrategy;
private boolean failPacket = false;
private static class Packet {
@@ -1183,7 +1183,7 @@ public class DFSOutputStream extends FSO
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
- cachingStrategy);
+ cachingStrategy.get());
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1378,8 +1378,8 @@ public class DFSOutputStream extends FSO
this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication();
this.progress = progress;
- this.cachingStrategy =
- dfsClient.getDefaultWriteCachingStrategy().duplicate();
+ this.cachingStrategy = new AtomicReference<CachingStrategy>(
+ dfsClient.getDefaultWriteCachingStrategy());
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug(
"Set non-null progress callback on DFSOutputStream " + src);
@@ -1993,7 +1993,14 @@ public class DFSOutputStream extends FSO
@Override
public void setDropBehind(Boolean dropBehind) throws IOException {
- this.cachingStrategy.setDropBehind(dropBehind);
+ CachingStrategy prevStrategy, nextStrategy;
+ // CachingStrategy is immutable. So build a new CachingStrategy with the
+ // modifications we want, and compare-and-swap it in.
+ do {
+ prevStrategy = this.cachingStrategy.get();
+ nextStrategy = new CachingStrategy.Builder(prevStrategy).
+ setDropBehind(dropBehind).build();
+ } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
}
@VisibleForTesting
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java Wed Dec 18 23:29:05 2013
@@ -21,8 +21,8 @@ package org.apache.hadoop.hdfs.server.da
* The caching strategy we should use for an HDFS read or write operation.
*/
public class CachingStrategy {
- private Boolean dropBehind; // null = use server defaults
- private Long readahead; // null = use server defaults
+ private final Boolean dropBehind; // null = use server defaults
+ private final Long readahead; // null = use server defaults
public static CachingStrategy newDefaultStrategy() {
return new CachingStrategy(null, null);
@@ -32,8 +32,28 @@ public class CachingStrategy {
return new CachingStrategy(true, null);
}
- public CachingStrategy duplicate() {
- return new CachingStrategy(this.dropBehind, this.readahead);
+ public static class Builder {
+ private Boolean dropBehind;
+ private Long readahead;
+
+ public Builder(CachingStrategy prev) {
+ this.dropBehind = prev.dropBehind;
+ this.readahead = prev.readahead;
+ }
+
+ public Builder setDropBehind(Boolean dropBehind) {
+ this.dropBehind = dropBehind;
+ return this;
+ }
+
+ public Builder setReadahead(Long readahead) {
+ this.readahead = readahead;
+ return this;
+ }
+
+ public CachingStrategy build() {
+ return new CachingStrategy(dropBehind, readahead);
+ }
}
public CachingStrategy(Boolean dropBehind, Long readahead) {
@@ -45,18 +65,10 @@ public class CachingStrategy {
return dropBehind;
}
- public void setDropBehind(Boolean dropBehind) {
- this.dropBehind = dropBehind;
- }
-
public Long getReadahead() {
return readahead;
}
- public void setReadahead(Long readahead) {
- this.readahead = readahead;
- }
-
public String toString() {
return "CachingStrategy(dropBehind=" + dropBehind +
", readahead=" + readahead + ")";
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Dec 18 23:29:05 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
@@ -138,7 +139,8 @@ public class TestConnCache {
Matchers.anyLong(),
Matchers.anyInt(),
Matchers.anyBoolean(),
- Matchers.anyString());
+ Matchers.anyString(),
+ (CachingStrategy)Matchers.anyObject());
// Initial read
pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);