You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/05/19 08:57:33 UTC

hadoop git commit: HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker. Contributed by Rakesh R.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 3cf3398f3 -> 3676277c1


HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3676277c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3676277c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3676277c

Branch: refs/heads/HDFS-7285
Commit: 3676277c15d341be758738d975f426119605835f
Parents: 3cf3398
Author: Walter Su <wa...@apache.org>
Authored: Tue May 19 14:59:23 2015 +0800
Committer: Walter Su <wa...@apache.org>
Committed: Tue May 19 14:59:23 2015 +0800

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |  5 ++
 .../hdfs/server/datanode/BPOfferService.java    |  1 +
 .../erasurecode/ErasureCodingWorker.java        | 59 ++++++++++----------
 3 files changed, 35 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3676277c/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 939ba89..1e7dbea 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -218,6 +218,8 @@
 
     HDFS-8367. BlockInfoStriped uses EC schema. (Kai Sasaki via Kai Zheng)
 
+    HDFS-8352. Erasure Coding: test webhdfs read write stripe file. (waltersu4549)
+
     HDFS-8417. Erasure Coding: Pread failed to read data starting from not-first stripe.
     (Walter Su via jing9)
 
@@ -228,3 +230,6 @@
 	
 	HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue 
 	configurable in DFSStripedOutputStream. (Li Bo)
+
+    HDFS-8378. Erasure Coding: Few improvements for the erasure coding worker.
+    (Rakesh R via waltersu4549)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3676277c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 6606d0b..d77b36d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -728,6 +728,7 @@ class BPOfferService {
       LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY");
       Collection<BlockECRecoveryInfo> ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks();
       dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3676277c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index a1c0f72..4723e9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
@@ -88,12 +87,12 @@ import com.google.common.base.Preconditions;
  * commands.
  */
 public final class ErasureCodingWorker {
-  private final Log LOG = DataNode.LOG;
+  private static final Log LOG = DataNode.LOG;
   
   private final DataNode datanode; 
-  private Configuration conf;
+  private final Configuration conf;
 
-  private ThreadPoolExecutor STRIPED_READ_TRHEAD_POOL;
+  private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final int STRIPED_READ_THRESHOLD_MILLIS;
   private final int STRIPED_READ_BUFFER_SIZE;
 
@@ -121,7 +120,10 @@ public final class ErasureCodingWorker {
   }
 
   private void initializeStripedReadThreadPool(int num) {
-    STRIPED_READ_TRHEAD_POOL = new ThreadPoolExecutor(1, num, 60,
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using striped reads; pool threads=" + num);
+    }
+    STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
         TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
         new Daemon.DaemonFactory() {
       private final AtomicInteger threadIndex = new AtomicInteger(0);
@@ -141,7 +143,7 @@ public final class ErasureCodingWorker {
         super.rejectedExecution(runnable, e);
       }
     });
-    STRIPED_READ_TRHEAD_POOL.allowCoreThreadTimeOut(true);
+    STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
   }
 
   /**
@@ -231,23 +233,23 @@ public final class ErasureCodingWorker {
 
     // sources
     private final short[] liveIndices;
-    private DatanodeInfo[] sources;
+    private final DatanodeInfo[] sources;
 
-    private List<StripedReader> stripedReaders;
+    private final List<StripedReader> stripedReaders;
 
     // targets
-    private DatanodeInfo[] targets;
-    private StorageType[] targetStorageTypes;
+    private final DatanodeInfo[] targets;
+    private final StorageType[] targetStorageTypes;
 
-    private short[] targetIndices;
-    private ByteBuffer[] targetBuffers;
+    private final short[] targetIndices;
+    private final ByteBuffer[] targetBuffers;
 
-    private Socket[] targetSockets;
-    private DataOutputStream[] targetOutputStreams;
-    private DataInputStream[] targetInputStreams;
+    private final Socket[] targetSockets;
+    private final DataOutputStream[] targetOutputStreams;
+    private final DataInputStream[] targetInputStreams;
 
-    private long[] blockOffset4Targets;
-    private long[] seqNo4Targets;
+    private final long[] blockOffset4Targets;
+    private final long[] seqNo4Targets;
 
     private final int WRITE_PACKET_SIZE = 64 * 1024;
     private DataChecksum checksum;
@@ -257,11 +259,11 @@ public final class ErasureCodingWorker {
     private int bytesPerChecksum;
     private int checksumSize;
 
-    private CachingStrategy cachingStrategy;
+    private final CachingStrategy cachingStrategy;
 
-    private Map<Future<Void>, Integer> futures = new HashMap<>();
-    private CompletionService<Void> readService =
-        new ExecutorCompletionService<>(STRIPED_READ_TRHEAD_POOL);
+    private final Map<Future<Void>, Integer> futures = new HashMap<>();
+    private final CompletionService<Void> readService =
+        new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL);
 
     ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) {
       ECSchema schema = recoveryInfo.getECSchema();
@@ -277,7 +279,8 @@ public final class ErasureCodingWorker {
 
       Preconditions.checkArgument(liveIndices.length >= dataBlkNum,
           "No enough live striped blocks.");
-      Preconditions.checkArgument(liveIndices.length == sources.length);
+      Preconditions.checkArgument(liveIndices.length == sources.length,
+          "liveBlockIndices and source dns should match");
 
       targets = recoveryInfo.getTargetDnInfos();
       targetStorageTypes = recoveryInfo.getTargetStorageTypes();
@@ -336,7 +339,6 @@ public final class ErasureCodingWorker {
         if (nsuccess < dataBlkNum) {
           String error = "Can't find minimum sources required by "
               + "recovery, block id: " + blockGroup.getBlockId();
-          LOG.warn(error);
           throw new IOException(error);
         }
 
@@ -358,7 +360,6 @@ public final class ErasureCodingWorker {
         boolean[] targetsStatus = new boolean[targets.length];
         if (initTargetStreams(targetsStatus) == 0) {
           String error = "All targets are failed.";
-          LOG.warn(error);
           throw new IOException(error);
         }
 
@@ -372,7 +373,6 @@ public final class ErasureCodingWorker {
           if (nsuccess < dataBlkNum) {
             String error = "Can't read data from minimum number of sources "
                 + "required by recovery, block id: " + blockGroup.getBlockId();
-            LOG.warn(error);
             throw new IOException(error);
           }
 
@@ -385,7 +385,6 @@ public final class ErasureCodingWorker {
           // step3: transfer data
           if (transferData2Targets(targetsStatus) == 0) {
             String error = "Transfer failed for all targets.";
-            LOG.warn(error);
             throw new IOException(error);
           }
 
@@ -906,11 +905,11 @@ public final class ErasureCodingWorker {
   }
 
   private class StripedReader {
-    short index;
-    BlockReader blockReader;
-    ByteBuffer buffer;
+    private final short index;
+    private BlockReader blockReader;
+    private ByteBuffer buffer;
 
-    public StripedReader(short index) {
+    private StripedReader(short index) {
       this.index = index;
     }
   }