You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/12/19 00:29:05 UTC

svn commit: r1552162 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/

Author: cmccabe
Date: Wed Dec 18 23:29:05 2013
New Revision: 1552162

URL: http://svn.apache.org/r1552162
Log:
HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Dec 18 23:29:05 2013
@@ -241,9 +241,6 @@ Trunk (Unreleased)
     HDFS-5431. Support cachepool-based limit management in path-based caching
     (awang via cmccabe)
 
-    HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
-    (cmccabe)
-
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 
@@ -754,6 +751,9 @@ Release 2.4.0 - UNRELEASED
     FSEditLogOp; change FSEditLogOpCodes.fromByte(..) to be more efficient; and
     change Some fields in FSEditLog to final.  (szetszwo)
 
+    HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
+    (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -809,6 +809,8 @@ Release 2.4.0 - UNRELEASED
     HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion.
     (Binglin Chang via junping_du)
 
+    HDFS-5676. fix inconsistent synchronization of CachingStrategy (cmccabe)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Dec 18 23:29:05 2013
@@ -228,7 +228,7 @@ implements ByteBufferReadable, CanSetDro
         dfsClient.getConf().shortCircuitStreamsCacheSize,
         dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
     this.cachingStrategy =
-        dfsClient.getDefaultReadCachingStrategy().duplicate();
+        dfsClient.getDefaultReadCachingStrategy();
     openInfo();
   }
 
@@ -574,7 +574,7 @@ implements ByteBufferReadable, CanSetDro
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
             accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
-            buffersize, verifyChecksum, dfsClient.clientName);
+            buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy);
         if(connectFailedOnce) {
           DFSClient.LOG.info("Successfully connected to " + targetAddr +
                              " for " + blk);
@@ -928,7 +928,11 @@ implements ByteBufferReadable, CanSetDro
       // cached block locations may have been updated by chooseDataNode()
       // or fetchBlockAt(). Always get the latest list of locations at the 
       // start of the loop.
-      block = getBlockAt(block.getStartOffset(), false);
+      CachingStrategy curCachingStrategy;
+      synchronized (this) {
+        block = getBlockAt(block.getStartOffset(), false);
+        curCachingStrategy = cachingStrategy;
+      }
       DNAddrPair retval = chooseDataNode(block);
       DatanodeInfo chosenNode = retval.info;
       InetSocketAddress targetAddr = retval.addr;
@@ -940,7 +944,7 @@ implements ByteBufferReadable, CanSetDro
         int len = (int) (end - start + 1);
         reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
             blockToken, start, len, buffersize, verifyChecksum,
-            dfsClient.clientName);
+            dfsClient.clientName, curCachingStrategy);
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -1053,6 +1057,7 @@ implements ByteBufferReadable, CanSetDro
    * @param bufferSize  The IO buffer size (not the client buffer size)
    * @param verifyChecksum  Whether to verify checksum
    * @param clientName  Client name
+   * @param CachingStrategy  caching strategy to use
    * @return New BlockReader instance
    */
   protected BlockReader getBlockReader(InetSocketAddress dnAddr,
@@ -1064,7 +1069,8 @@ implements ByteBufferReadable, CanSetDro
                                        long len,
                                        int bufferSize,
                                        boolean verifyChecksum,
-                                       String clientName)
+                                       String clientName,
+                                       CachingStrategy curCachingStrategy)
       throws IOException {
     // Firstly, we check to see if we have cached any file descriptors for
     // local blocks.  If so, we can just re-use those file descriptors.
@@ -1084,7 +1090,7 @@ implements ByteBufferReadable, CanSetDro
           setBlockMetadataHeader(BlockMetadataHeader.
               preadHeader(fis[1].getChannel())).
           setFileInputStreamCache(fileInputStreamCache).
-          setCachingStrategy(cachingStrategy).
+          setCachingStrategy(curCachingStrategy).
           build();
     }
     
@@ -1119,7 +1125,7 @@ implements ByteBufferReadable, CanSetDro
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, cachingStrategy);
+            allowShortCircuitLocalReads, curCachingStrategy);
         return reader;
       } catch (IOException ex) {
         DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@@ -1142,7 +1148,7 @@ implements ByteBufferReadable, CanSetDro
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode,
             dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, cachingStrategy);
+            allowShortCircuitLocalReads, curCachingStrategy);
         return reader;
       } catch (IOException e) {
         DFSClient.LOG.warn("failed to connect to " + domSock, e);
@@ -1166,7 +1172,7 @@ implements ByteBufferReadable, CanSetDro
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             dsFactory, peerCache, fileInputStreamCache, false,
-            cachingStrategy);
+            curCachingStrategy);
         return reader;
       } catch (IOException ex) {
         DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@@ -1186,7 +1192,7 @@ implements ByteBufferReadable, CanSetDro
         dfsClient.getConf(), file, block, blockToken, startOffset,
         len, verifyChecksum, clientName, peer, chosenNode, 
         dsFactory, peerCache, fileInputStreamCache, false,
-        cachingStrategy);
+        curCachingStrategy);
   }
 
 
@@ -1460,14 +1466,18 @@ implements ByteBufferReadable, CanSetDro
   @Override
   public synchronized void setReadahead(Long readahead)
       throws IOException {
-    this.cachingStrategy.setReadahead(readahead);
+    this.cachingStrategy =
+        new CachingStrategy.Builder(this.cachingStrategy).
+            setReadahead(readahead).build();
     closeCurrentBlockReader();
   }
 
   @Override
   public synchronized void setDropBehind(Boolean dropBehind)
       throws IOException {
-    this.cachingStrategy.setDropBehind(dropBehind);
+    this.cachingStrategy =
+        new CachingStrategy.Builder(this.cachingStrategy).
+            setDropBehind(dropBehind).build();
     closeCurrentBlockReader();
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Dec 18 23:29:05 2013
@@ -150,7 +150,7 @@ public class DFSOutputStream extends FSO
   private Progressable progress;
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
-  private CachingStrategy cachingStrategy;
+  private AtomicReference<CachingStrategy> cachingStrategy;
   private boolean failPacket = false;
   
   private static class Packet {
@@ -1183,7 +1183,7 @@ public class DFSOutputStream extends FSO
           new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
               nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
               nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
-              cachingStrategy);
+              cachingStrategy.get());
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1378,8 +1378,8 @@ public class DFSOutputStream extends FSO
     this.blockSize = stat.getBlockSize();
     this.blockReplication = stat.getReplication();
     this.progress = progress;
-    this.cachingStrategy =
-        dfsClient.getDefaultWriteCachingStrategy().duplicate();
+    this.cachingStrategy = new AtomicReference<CachingStrategy>(
+        dfsClient.getDefaultWriteCachingStrategy());
     if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug(
           "Set non-null progress callback on DFSOutputStream " + src);
@@ -1993,7 +1993,14 @@ public class DFSOutputStream extends FSO
 
   @Override
   public void setDropBehind(Boolean dropBehind) throws IOException {
-    this.cachingStrategy.setDropBehind(dropBehind);
+    CachingStrategy prevStrategy, nextStrategy;
+    // CachingStrategy is immutable.  So build a new CachingStrategy with the
+    // modifications we want, and compare-and-swap it in.
+    do {
+      prevStrategy = this.cachingStrategy.get();
+      nextStrategy = new CachingStrategy.Builder(prevStrategy).
+                        setDropBehind(dropBehind).build();
+    } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
   }
 
   @VisibleForTesting

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CachingStrategy.java Wed Dec 18 23:29:05 2013
@@ -21,8 +21,8 @@ package org.apache.hadoop.hdfs.server.da
  * The caching strategy we should use for an HDFS read or write operation.
  */
 public class CachingStrategy {
-  private Boolean dropBehind; // null = use server defaults
-  private Long readahead; // null = use server defaults
+  private final Boolean dropBehind; // null = use server defaults
+  private final Long readahead; // null = use server defaults
   
   public static CachingStrategy newDefaultStrategy() {
     return new CachingStrategy(null, null);
@@ -32,8 +32,28 @@ public class CachingStrategy {
     return new CachingStrategy(true, null);
   }
 
-  public CachingStrategy duplicate() {
-    return new CachingStrategy(this.dropBehind, this.readahead);
+  public static class Builder {
+    private Boolean dropBehind;
+    private Long readahead;
+
+    public Builder(CachingStrategy prev) {
+      this.dropBehind = prev.dropBehind;
+      this.readahead = prev.readahead;
+    }
+
+    public Builder setDropBehind(Boolean dropBehind) {
+      this.dropBehind = dropBehind;
+      return this;
+    }
+
+    public Builder setReadahead(Long readahead) {
+      this.readahead = readahead;
+      return this;
+    }
+
+    public CachingStrategy build() {
+      return new CachingStrategy(dropBehind, readahead);
+    }
   }
 
   public CachingStrategy(Boolean dropBehind, Long readahead) {
@@ -45,18 +65,10 @@ public class CachingStrategy {
     return dropBehind;
   }
   
-  public void setDropBehind(Boolean dropBehind) {
-    this.dropBehind = dropBehind;
-  }
-  
   public Long getReadahead() {
     return readahead;
   }
 
-  public void setReadahead(Long readahead) {
-    this.readahead = readahead;
-  }
-
   public String toString() {
     return "CachingStrategy(dropBehind=" + dropBehind +
         ", readahead=" + readahead + ")";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1552162&r1=1552161&r2=1552162&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Wed Dec 18 23:29:05 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.security.token.Token;
 import org.junit.Assert;
@@ -138,7 +139,8 @@ public class TestConnCache {
                            Matchers.anyLong(),
                            Matchers.anyInt(),
                            Matchers.anyBoolean(),
-                           Matchers.anyString());
+                           Matchers.anyString(),
+                           (CachingStrategy)Matchers.anyObject());
 
     // Initial read
     pread(in, 0, dataBuf, 0, dataBuf.length, authenticData);