You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/01 09:01:51 UTC

[34/50] [abbrv] hadoop git commit: HDFS-8283. DataStreamer cleanup and some minor improvement. Contributed by Tsz Wo Nicholas Sze.

HDFS-8283. DataStreamer cleanup and some minor improvement. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7947e5b5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7947e5b5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7947e5b5

Branch: refs/heads/HDFS-7240
Commit: 7947e5b53b9ac9524b535b0384c1c355b74723ff
Parents: 8f82970
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Apr 29 10:41:46 2015 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Apr 29 10:41:46 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/io/MultipleIOException.java   |  26 ++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  30 +--
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 235 ++++++++++---------
 .../apache/hadoop/hdfs/TestDFSOutputStream.java |   3 +-
 5 files changed, 162 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7947e5b5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java
index 5e584c9c..66c1ab1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MultipleIOException.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.io;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -51,4 +52,29 @@ public class MultipleIOException extends IOException {
     }
     return new MultipleIOException(exceptions);
   }
+
+  /**
+   * Build an {@link IOException} using {@link MultipleIOException}
+   * if there are more than one.
+   */
+  public static class Builder {
+    private List<IOException> exceptions;
+    
+    /** Add the given {@link Throwable} to the exception list. */
+    public void add(Throwable t) {
+      if (exceptions == null) {
+        exceptions = new ArrayList<>();
+      }
+      exceptions.add(t instanceof IOException? (IOException)t
+          : new IOException(t));
+    }
+
+    /**
+     * @return null if nothing is added to this builder;
+     *         otherwise, return an {@link IOException}
+     */
+    public IOException build() {
+      return createIOException(exceptions);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7947e5b5/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e7fa8fd..2dde356 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -480,6 +480,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8280. Code Cleanup in DFSInputStream. (Jing Zhao via wheat9)
 
+    HDFS-8283. DataStreamer cleanup and some minor improvement. (szetszwo via
+    jing9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7947e5b5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index d9b8ee7..4646b60 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -139,8 +139,7 @@ public class DFSOutputStream extends FSOutputSummer
   @Override
   protected void checkClosed() throws IOException {
     if (isClosed()) {
-      IOException e = streamer.getLastException().get();
-      throw e != null ? e : new ClosedChannelException();
+      streamer.getLastException().throwException4Close();
     }
   }
 
@@ -216,10 +215,7 @@ public class DFSOutputStream extends FSOutputSummer
     computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum);
 
     streamer = new DataStreamer(stat, null, dfsClient, src, progress, checksum,
-        cachingStrategy, byteArrayManager);
-    if (favoredNodes != null && favoredNodes.length != 0) {
-      streamer.setFavoredNodes(favoredNodes);
-    }
+        cachingStrategy, byteArrayManager, favoredNodes);
   }
 
   static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
@@ -282,7 +278,8 @@ public class DFSOutputStream extends FSOutputSummer
   /** Construct a new output stream for append. */
   private DFSOutputStream(DFSClient dfsClient, String src,
       EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
-      HdfsFileStatus stat, DataChecksum checksum) throws IOException {
+      HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
+          throws IOException {
     this(dfsClient, src, progress, stat, checksum);
     initialFileSize = stat.getLen(); // length of file when opened
     this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
@@ -303,7 +300,8 @@ public class DFSOutputStream extends FSOutputSummer
       computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
           bytesPerChecksum);
       streamer = new DataStreamer(stat, lastBlock != null ? lastBlock.getBlock() : null,
-          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager);
+          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+          favoredNodes);
     }
   }
 
@@ -351,10 +349,7 @@ public class DFSOutputStream extends FSOutputSummer
         dfsClient.getPathTraceScope("newStreamForAppend", src);
     try {
       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
-          progress, lastBlock, stat, checksum);
-      if (favoredNodes != null && favoredNodes.length != 0) {
-        out.streamer.setFavoredNodes(favoredNodes);
-      }
+          progress, lastBlock, stat, checksum, favoredNodes);
       out.start();
       return out;
     } finally {
@@ -653,7 +648,7 @@ public class DFSOutputStream extends FSOutputSummer
       DFSClient.LOG.warn("Error while syncing", e);
       synchronized (this) {
         if (!isClosed()) {
-          streamer.getLastException().set(new IOException("IOException flush: " + e));
+          streamer.getLastException().set(e);
           closeThreads(true);
         }
       }
@@ -720,7 +715,7 @@ public class DFSOutputStream extends FSOutputSummer
     if (isClosed()) {
       return;
     }
-    streamer.setLastException(new IOException("Lease timeout of "
+    streamer.getLastException().set(new IOException("Lease timeout of "
         + (dfsClient.getConf().getHdfsTimeout()/1000) + " seconds expired."));
     closeThreads(true);
     dfsClient.endFileLease(fileId);
@@ -767,11 +762,8 @@ public class DFSOutputStream extends FSOutputSummer
 
   protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
-      IOException e = streamer.getLastException().getAndSet(null);
-      if (e == null)
-        return;
-      else
-        throw e;
+      streamer.getLastException().check();
+      return;
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7947e5b5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 5f0c9ac..3727d20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -41,6 +41,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -73,6 +75,7 @@ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -88,6 +91,7 @@ import org.apache.htrace.Trace;
 import org.apache.htrace.TraceInfo;
 import org.apache.htrace.TraceScope;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -117,6 +121,7 @@ import com.google.common.cache.RemovalNotification;
 
 @InterfaceAudience.Private
 class DataStreamer extends Daemon {
+  static final Log LOG = LogFactory.getLog(DataStreamer.class);
   /**
    * Create a socket for a write pipeline
    *
@@ -129,8 +134,8 @@ class DataStreamer extends Daemon {
       final int length, final DFSClient client) throws IOException {
     final DfsClientConf conf = client.getConf();
     final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname());
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to datanode " + dnAddr);
     }
     final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
     final Socket sock = client.socketFactory.createSocket();
@@ -138,8 +143,8 @@ class DataStreamer extends Daemon {
     NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout());
     sock.setSoTimeout(timeout);
     sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
-    if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Send buf size " + sock.getSendBufferSize());
     }
     return sock;
   }
@@ -168,6 +173,34 @@ class DataStreamer extends Daemon {
     }
     packets.clear();
   }
+  
+  static class LastException {
+    private Throwable thrown;
+
+    synchronized void set(Throwable t) {
+      Preconditions.checkNotNull(t);
+      Preconditions.checkState(thrown == null);
+      this.thrown = t;
+    }
+
+    synchronized void clear() {
+      thrown = null;
+    }
+
+    /** Check if there already is an exception. */
+    synchronized void check() throws IOException {
+      if (thrown != null) {
+        throw new IOException(thrown);
+      }
+    }
+
+    synchronized void throwException4Close() throws IOException {
+      check();
+      final IOException ioe = new ClosedChannelException();
+      thrown = ioe;
+      throw ioe;
+    }
+  }
 
   private volatile boolean streamerClosed = false;
   private ExtendedBlock block; // its length is number of bytes acked
@@ -178,7 +211,6 @@ class DataStreamer extends Daemon {
   private volatile DatanodeInfo[] nodes = null; // list of targets for current block
   private volatile StorageType[] storageTypes = null;
   private volatile String[] storageIDs = null;
-  private String[] favoredNodes;
   volatile boolean hasError = false;
   volatile int errorIndex = -1;
   // Restarting node index
@@ -196,13 +228,13 @@ class DataStreamer extends Daemon {
   /** Has the current block been hflushed? */
   private boolean isHflushed = false;
   /** Append on an existing block? */
-  private boolean isAppend;
+  private final boolean isAppend;
 
   private long currentSeqno = 0;
   private long lastQueuedSeqno = -1;
   private long lastAckedSeqno = -1;
   private long bytesCurBlock = 0; // bytes written in current block
-  private final AtomicReference<IOException> lastException = new AtomicReference<>();
+  private final LastException lastException = new LastException();
   private Socket s;
 
   private final DFSClient dfsClient;
@@ -227,18 +259,20 @@ class DataStreamer extends Daemon {
   private long artificialSlowdown = 0;
   // List of congested data nodes. The stream will back off if the DataNodes
   // are congested
-  private final ArrayList<DatanodeInfo> congestedNodes = new ArrayList<>();
+  private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
   private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
   private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
       CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
   private int lastCongestionBackoffTime;
 
   private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
+  private final String[] favoredNodes;
 
   private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
                        Progressable progress, DataChecksum checksum,
                        AtomicReference<CachingStrategy> cachingStrategy,
-                       ByteArrayManager byteArrayManage){
+                       ByteArrayManager byteArrayManage,
+                       boolean isAppend, String[] favoredNodes) {
     this.dfsClient = dfsClient;
     this.src = src;
     this.progress = progress;
@@ -246,10 +280,12 @@ class DataStreamer extends Daemon {
     this.checksum4WriteBlock = checksum;
     this.cachingStrategy = cachingStrategy;
     this.byteArrayManager = byteArrayManage;
-    isLazyPersistFile = isLazyPersist(stat);
+    this.isLazyPersistFile = isLazyPersist(stat);
     this.dfsclientSlowLogThresholdMs =
         dfsClient.getConf().getSlowIoWarningThresholdMs();
-    excludedNodes = initExcludedNodes();
+    this.excludedNodes = initExcludedNodes();
+    this.isAppend = isAppend;
+    this.favoredNodes = favoredNodes;
   }
 
   /**
@@ -258,10 +294,9 @@ class DataStreamer extends Daemon {
   DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient,
                String src, Progressable progress, DataChecksum checksum,
                AtomicReference<CachingStrategy> cachingStrategy,
-               ByteArrayManager byteArrayManage) {
+               ByteArrayManager byteArrayManage, String[] favoredNodes) {
     this(stat, dfsClient, src, progress, checksum, cachingStrategy,
-        byteArrayManage);
-    isAppend = false;
+        byteArrayManage, false, favoredNodes);
     this.block = block;
     stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
   }
@@ -277,8 +312,7 @@ class DataStreamer extends Daemon {
                AtomicReference<CachingStrategy> cachingStrategy,
                ByteArrayManager byteArrayManage) throws IOException {
     this(stat, dfsClient, src, progress, checksum, cachingStrategy,
-        byteArrayManage);
-    isAppend = true;
+        byteArrayManage, true, null);
     stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
     block = lastBlock.getBlock();
     bytesSent = block.getNumBytes();
@@ -314,15 +348,6 @@ class DataStreamer extends Daemon {
   }
 
   /**
-   * Set favored nodes
-   *
-   * @param favoredNodes favored nodes
-   */
-  void setFavoredNodes(String[] favoredNodes) {
-    this.favoredNodes = favoredNodes;
-  }
-
-  /**
    * Initialize for data streaming
    */
   private void initDataStreaming() {
@@ -334,8 +359,8 @@ class DataStreamer extends Daemon {
   }
 
   private void endBlock() {
-    if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Closing old block " + block);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Closing old block " + block);
     }
     this.setName("DataStreamer for file " + src);
     closeResponder();
@@ -360,7 +385,7 @@ class DataStreamer extends Daemon {
           response.join();
           response = null;
         } catch (InterruptedException  e) {
-          DFSClient.LOG.warn("Caught exception ", e);
+          LOG.warn("Caught exception", e);
         }
       }
 
@@ -388,7 +413,7 @@ class DataStreamer extends Daemon {
             try {
               dataQueue.wait(timeout);
             } catch (InterruptedException  e) {
-              DFSClient.LOG.warn("Caught exception ", e);
+              LOG.warn("Caught exception", e);
             }
             doSleep = false;
             now = Time.monotonicNow();
@@ -404,7 +429,7 @@ class DataStreamer extends Daemon {
             try {
               backOffIfNecessary();
             } catch (InterruptedException e) {
-              DFSClient.LOG.warn("Caught exception ", e);
+              LOG.warn("Caught exception", e);
             }
             one = dataQueue.getFirst(); // regular data packet
             long parents[] = one.getTraceParents();
@@ -419,14 +444,14 @@ class DataStreamer extends Daemon {
 
         // get new block from namenode.
         if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-          if(DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Allocating new block");
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Allocating new block");
           }
           setPipeline(nextBlockOutputStream());
           initDataStreaming();
         } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
-          if(DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("Append to block " + block);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Append to block " + block);
           }
           setupPipelineForAppendOrRecovery();
           initDataStreaming();
@@ -450,7 +475,7 @@ class DataStreamer extends Daemon {
                 // wait for acks to arrive from datanodes
                 dataQueue.wait(1000);
               } catch (InterruptedException  e) {
-                DFSClient.LOG.warn("Caught exception ", e);
+                LOG.warn("Caught exception", e);
               }
             }
           }
@@ -473,8 +498,8 @@ class DataStreamer extends Daemon {
           }
         }
 
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("DataStreamer block " + block +
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("DataStreamer block " + block +
               " sending packet " + one);
         }
 
@@ -534,16 +559,12 @@ class DataStreamer extends Daemon {
           // Since their messages are descriptive enough, do not always
           // log a verbose stack-trace WARN for quota exceptions.
           if (e instanceof QuotaExceededException) {
-            DFSClient.LOG.debug("DataStreamer Quota Exception", e);
+            LOG.debug("DataStreamer Quota Exception", e);
           } else {
-            DFSClient.LOG.warn("DataStreamer Exception", e);
+            LOG.warn("DataStreamer Exception", e);
           }
         }
-        if (e instanceof IOException) {
-          setLastException((IOException)e);
-        } else {
-          setLastException(new IOException("DataStreamer Exception: ",e));
-        }
+        lastException.set(e);
         hasError = true;
         if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
           // Not a datanode issue
@@ -586,8 +607,8 @@ class DataStreamer extends Daemon {
   void waitForAckedSeqno(long seqno) throws IOException {
     TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
     try {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Waiting for ack for: " + seqno);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Waiting for ack for: " + seqno);
       }
       long begin = Time.monotonicNow();
       try {
@@ -611,7 +632,7 @@ class DataStreamer extends Daemon {
       }
       long duration = Time.monotonicNow() - begin;
       if (duration > dfsclientSlowLogThresholdMs) {
-        DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
+        LOG.warn("Slow waitForAckedSeqno took " + duration
             + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
       }
     } finally {
@@ -688,8 +709,7 @@ class DataStreamer extends Daemon {
 
   private void checkClosed() throws IOException {
     if (streamerClosed) {
-      IOException e = lastException.get();
-      throw e != null ? e : new ClosedChannelException();
+      lastException.throwException4Close();
     }
   }
 
@@ -699,7 +719,7 @@ class DataStreamer extends Daemon {
         response.close();
         response.join();
       } catch (InterruptedException  e) {
-        DFSClient.LOG.warn("Caught exception ", e);
+        LOG.warn("Caught exception", e);
       } finally {
         response = null;
       }
@@ -707,11 +727,13 @@ class DataStreamer extends Daemon {
   }
 
   private void closeStream() {
+    final MultipleIOException.Builder b = new MultipleIOException.Builder();
+
     if (blockStream != null) {
       try {
         blockStream.close();
       } catch (IOException e) {
-        setLastException(e);
+        b.add(e);
       } finally {
         blockStream = null;
       }
@@ -720,7 +742,7 @@ class DataStreamer extends Daemon {
       try {
         blockReplyStream.close();
       } catch (IOException e) {
-        setLastException(e);
+        b.add(e);
       } finally {
         blockReplyStream = null;
       }
@@ -729,11 +751,16 @@ class DataStreamer extends Daemon {
       try {
         s.close();
       } catch (IOException e) {
-        setLastException(e);
+        b.add(e);
       } finally {
         s = null;
       }
     }
+
+    final IOException ioe = b.build();
+    if (ioe != null) {
+      lastException.set(ioe);
+    }
   }
 
   // The following synchronized methods are used whenever
@@ -825,12 +852,11 @@ class DataStreamer extends Daemon {
           long duration = Time.monotonicNow() - begin;
           if (duration > dfsclientSlowLogThresholdMs
               && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
-            DFSClient.LOG
-                .warn("Slow ReadProcessor read fields took " + duration
-                    + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
-                    + ack + ", targets: " + Arrays.asList(targets));
-          } else if (DFSClient.LOG.isDebugEnabled()) {
-            DFSClient.LOG.debug("DFSClient " + ack);
+            LOG.warn("Slow ReadProcessor read fields took " + duration
+                + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
+                + ack + ", targets: " + Arrays.asList(targets));
+          } else if (LOG.isDebugEnabled()) {
+            LOG.debug("DFSClient " + ack);
           }
 
           long seqno = ack.getSeqno();
@@ -851,7 +877,7 @@ class DataStreamer extends Daemon {
                   + Time.monotonicNow();
               setRestartingNodeIndex(i);
               String message = "A datanode is restarting: " + targets[i];
-              DFSClient.LOG.info(message);
+              LOG.info(message);
               throw new IOException(message);
             }
             // node error
@@ -917,9 +943,7 @@ class DataStreamer extends Daemon {
           }
         } catch (Exception e) {
           if (!responderClosed) {
-            if (e instanceof IOException) {
-              setLastException((IOException)e);
-            }
+            lastException.set(e);
             hasError = true;
             // If no explicit error report was received, mark the primary
             // node as failed.
@@ -928,8 +952,7 @@ class DataStreamer extends Daemon {
               dataQueue.notifyAll();
             }
             if (restartingNodeIndex.get() == -1) {
-              DFSClient.LOG.warn("DataStreamer ResponseProcessor exception "
-                  + " for block " + block, e);
+              LOG.warn("Exception for " + block, e);
             }
             responderClosed = true;
           }
@@ -951,7 +974,7 @@ class DataStreamer extends Daemon {
   //
   private boolean processDatanodeError() throws IOException {
     if (response != null) {
-      DFSClient.LOG.info("Error Recovery for " + block +
+      LOG.info("Error Recovery for " + block +
           " waiting for responder to exit. ");
       return true;
     }
@@ -972,7 +995,7 @@ class DataStreamer extends Daemon {
       // same packet, this client likely has corrupt data or corrupting
       // during transmission.
       if (++pipelineRecoveryCount > 5) {
-        DFSClient.LOG.warn("Error recovering pipeline for writing " +
+        LOG.warn("Error recovering pipeline for writing " +
             block + ". Already retried 5 times for the same packet.");
         lastException.set(new IOException("Failing write. Tried pipeline " +
             "recovery 5 times without success."));
@@ -1147,8 +1170,8 @@ class DataStreamer extends Daemon {
     if (nodes == null || nodes.length == 0) {
       String msg = "Could not get block locations. " + "Source file \""
           + src + "\" - Aborting...";
-      DFSClient.LOG.warn(msg);
-      setLastException(new IOException(msg));
+      LOG.warn(msg);
+      lastException.set(new IOException(msg));
       streamerClosed = true;
       return false;
     }
@@ -1193,7 +1216,7 @@ class DataStreamer extends Daemon {
           streamerClosed = true;
           return false;
         }
-        DFSClient.LOG.warn("Error Recovery for block " + block +
+        LOG.warn("Error Recovery for block " + block +
             " in pipeline " + pipelineMsg +
             ": bad datanode " + nodes[errorIndex]);
         failed.add(nodes[errorIndex]);
@@ -1227,7 +1250,7 @@ class DataStreamer extends Daemon {
         if (restartingNodeIndex.get() == -1) {
           hasError = false;
         }
-        lastException.set(null);
+        lastException.clear();
         errorIndex = -1;
       }
 
@@ -1240,7 +1263,7 @@ class DataStreamer extends Daemon {
           if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
             throw ioe;
           }
-          DFSClient.LOG.warn("Failed to replace datanode."
+          LOG.warn("Failed to replace datanode."
               + " Continue with the remaining datanodes since "
               + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY
               + " is set to true.", ioe);
@@ -1281,7 +1304,7 @@ class DataStreamer extends Daemon {
         restartDeadline = 0;
         int expiredNodeIndex = restartingNodeIndex.get();
         restartingNodeIndex.set(-1);
-        DFSClient.LOG.warn("Datanode did not restart in time: " +
+        LOG.warn("Datanode did not restart in time: " +
             nodes[expiredNodeIndex]);
         // Mark the restarting node as failed. If there is any other failed
         // node during the last pipeline construction attempt, it will not be
@@ -1321,7 +1344,7 @@ class DataStreamer extends Daemon {
     ExtendedBlock oldBlock = block;
     do {
       hasError = false;
-      lastException.set(null);
+      lastException.clear();
       errorIndex = -1;
       success = false;
 
@@ -1344,11 +1367,11 @@ class DataStreamer extends Daemon {
       success = createBlockOutputStream(nodes, storageTypes, 0L, false);
 
       if (!success) {
-        DFSClient.LOG.info("Abandoning " + block);
+        LOG.info("Abandoning " + block);
         dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
             dfsClient.clientName);
         block = null;
-        DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
+        LOG.info("Excluding datanode " + nodes[errorIndex]);
         excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
       }
     } while (!success && --count >= 0);
@@ -1365,17 +1388,14 @@ class DataStreamer extends Daemon {
   private boolean createBlockOutputStream(DatanodeInfo[] nodes,
       StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
     if (nodes.length == 0) {
-      DFSClient.LOG.info("nodes are empty for write pipeline of block "
-          + block);
+      LOG.info("nodes are empty for write pipeline of " + block);
       return false;
     }
     Status pipelineStatus = SUCCESS;
     String firstBadLink = "";
     boolean checkRestart = false;
-    if (DFSClient.LOG.isDebugEnabled()) {
-      for (int i = 0; i < nodes.length; i++) {
-        DFSClient.LOG.debug("pipeline = " + nodes[i]);
-      }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("pipeline = " + Arrays.asList(nodes));
     }
 
     // persist blocks on namenode on next flush
@@ -1447,10 +1467,10 @@ class DataStreamer extends Daemon {
         hasError = false;
       } catch (IOException ie) {
         if (restartingNodeIndex.get() == -1) {
-          DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+          LOG.info("Exception in createBlockOutputStream", ie);
         }
         if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
-          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+          LOG.info("Will fetch a new encryption key and retry, "
               + "encryption key was invalid when connecting to "
               + nodes[0] + " : " + ie);
           // The encryption key used is invalid.
@@ -1480,11 +1500,11 @@ class DataStreamer extends Daemon {
               + Time.monotonicNow();
           restartingNodeIndex.set(errorIndex);
           errorIndex = -1;
-          DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
+          LOG.info("Waiting for the datanode to be restarted: " +
               nodes[restartingNodeIndex.get()]);
         }
         hasError = true;
-        setLastException(ie);
+        lastException.set(ie);
         result =  false;  // error
       } finally {
         if (!result) {
@@ -1509,18 +1529,16 @@ class DataStreamer extends Daemon {
           new HashSet<String>(Arrays.asList(favoredNodes));
       for (int i = 0; i < nodes.length; i++) {
         pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
-        if (DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() +
-              " was chosen by name node (favored=" + pinnings[i] +
-              ").");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(nodes[i].getXferAddrWithHostname() +
+              " was chosen by name node (favored=" + pinnings[i] + ").");
         }
       }
       if (shouldLog && !favoredSet.isEmpty()) {
         // There is one or more favored nodes that were not allocated.
-        DFSClient.LOG.warn(
-            "These favored nodes were specified but not chosen: " +
-                favoredSet +
-                " Specified favored nodes: " + Arrays.toString(favoredNodes));
+        LOG.warn("These favored nodes were specified but not chosen: "
+            + favoredSet + " Specified favored nodes: "
+            + Arrays.toString(favoredNodes));
 
       }
       return pinnings;
@@ -1557,19 +1575,19 @@ class DataStreamer extends Daemon {
               throw e;
             } else {
               --retries;
-              DFSClient.LOG.info("Exception while adding a block", e);
+              LOG.info("Exception while adding a block", e);
               long elapsed = Time.monotonicNow() - localstart;
               if (elapsed > 5000) {
-                DFSClient.LOG.info("Waiting for replication for "
+                LOG.info("Waiting for replication for "
                     + (elapsed / 1000) + " seconds");
               }
               try {
-                DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
+                LOG.warn("NotReplicatedYetException sleeping " + src
                     + " retries left " + retries);
                 Thread.sleep(sleeptime);
                 sleeptime *= 2;
               } catch (InterruptedException ie) {
-                DFSClient.LOG.warn("Caught exception ", ie);
+                LOG.warn("Caught exception", ie);
               }
             }
           } else {
@@ -1606,7 +1624,7 @@ class DataStreamer extends Daemon {
                      (int)(base + Math.random() * range));
         lastCongestionBackoffTime = t;
         sb.append(" are congested. Backing off for ").append(t).append(" ms");
-        DFSClient.LOG.info(sb.toString());
+        LOG.info(sb.toString());
         congestedNodes.clear();
       }
     }
@@ -1643,15 +1661,6 @@ class DataStreamer extends Daemon {
   }
 
   /**
-   * set last exception
-   *
-   * @param e an exception
-   */
-  void setLastException(IOException e) {
-    lastException.compareAndSet(null, e);
-  }
-
-  /**
    * Put a packet to the data queue
    *
    * @param packet the packet to be put into the data queued
@@ -1662,8 +1671,8 @@ class DataStreamer extends Daemon {
       packet.addTraceParent(Trace.currentSpan());
       dataQueue.addLast(packet);
       lastQueuedSeqno = packet.getSeqno();
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("Queued packet " + packet.getSeqno());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Queued packet " + packet.getSeqno());
       }
       dataQueue.notifyAll();
     }
@@ -1686,7 +1695,7 @@ class DataStreamer extends Daemon {
           @Override
           public void onRemoval(
               RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
-            DFSClient.LOG.info("Removing node " + notification.getKey()
+            LOG.info("Removing node " + notification.getKey()
                 + " from the excluded nodes list");
           }
         }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
@@ -1730,11 +1739,9 @@ class DataStreamer extends Daemon {
   }
 
   /**
-   * get the last exception
-   *
    * @return the last exception
    */
-  AtomicReference<IOException> getLastException(){
+  LastException getLastException(){
     return lastException;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7947e5b5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 478f7e5..eac1fcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -62,7 +62,6 @@ public class TestDFSOutputStream {
     FSDataOutputStream os = fs.create(new Path("/test"));
     DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
         "wrappedStream");
-    @SuppressWarnings("unchecked")
     DataStreamer streamer = (DataStreamer) Whitebox
         .getInternalState(dos, "streamer");
     @SuppressWarnings("unchecked")
@@ -122,7 +121,7 @@ public class TestDFSOutputStream {
         mock(HdfsFileStatus.class),
         mock(ExtendedBlock.class),
         client,
-        "foo", null, null, null, null);
+        "foo", null, null, null, null, null);
 
     DataOutputStream blockStream = mock(DataOutputStream.class);
     doThrow(new IOException()).when(blockStream).flush();