You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/04 23:42:47 UTC

[42/50] [abbrv] hadoop git commit: HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to hadoop-hdfs-client module. Contributed by Mingliang Liu.

HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to hadoop-hdfs-client module. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d16c4eee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d16c4eee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d16c4eee

Branch: refs/heads/YARN-1197
Commit: d16c4eee186492608ffeb1c2e83f437000cc64f6
Parents: 6eaca2e
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Sep 4 10:41:09 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Sep 4 10:41:09 2015 -0700

----------------------------------------------------------------------
 .../hdfs/protocol/datatransfer/PipelineAck.java | 243 ++++++++++++++++
 .../apache/hadoop/hdfs/util/LongBitFormat.java  |  71 +++++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/protocol/datatransfer/PipelineAck.java | 274 -------------------
 .../hdfs/server/datanode/BlockReceiver.java     |   2 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |  38 +++
 .../apache/hadoop/hdfs/util/LongBitFormat.java  |  71 -----
 7 files changed, 356 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
new file mode 100644
index 0000000..3836606
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.hdfs.util.LongBitFormat;
+
+/** Pipeline Acknowledgment **/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PipelineAck {
+  PipelineAckProto proto;
+  public final static long UNKOWN_SEQNO = -2;
+  final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
+  final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
+
+  public enum ECN {
+    DISABLED(0),
+    SUPPORTED(1),
+    SUPPORTED2(2),
+    CONGESTED(3);
+
+    private final int value;
+    private static final ECN[] VALUES = values();
+    static ECN valueOf(int value) {
+      return VALUES[value];
+    }
+
+    ECN(int value) {
+      this.value = value;
+    }
+
+    public int getValue() {
+      return value;
+    }
+  }
+
+  private enum StatusFormat {
+    STATUS(null, 4),
+    RESERVED(STATUS.BITS, 1),
+    ECN_BITS(RESERVED.BITS, 2);
+
+    private final LongBitFormat BITS;
+
+    StatusFormat(LongBitFormat prev, int bits) {
+      BITS = new LongBitFormat(name(), prev, bits, 0);
+    }
+
+    static Status getStatus(int header) {
+      return Status.valueOf((int) STATUS.BITS.retrieve(header));
+    }
+
+    static ECN getECN(int header) {
+      return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
+    }
+
+    public static int setStatus(int old, Status status) {
+      return (int) STATUS.BITS.combine(status.getNumber(), old);
+    }
+
+    public static int setECN(int old, ECN ecn) {
+      return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
+    }
+  }
+
+  /** default constructor **/
+  public PipelineAck() {
+  }
+  
+  /**
+   * Constructor assuming no next DN in pipeline
+   * @param seqno sequence number
+   * @param replies an array of replies
+   */
+  public PipelineAck(long seqno, int[] replies) {
+    this(seqno, replies, 0L);
+  }
+
+  /**
+   * Constructor
+   * @param seqno sequence number
+   * @param replies an array of replies
+   * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
+   */
+  public PipelineAck(long seqno, int[] replies,
+                     long downstreamAckTimeNanos) {
+    ArrayList<Status> statusList = Lists.newArrayList();
+    ArrayList<Integer> flagList = Lists.newArrayList();
+    for (int r : replies) {
+      statusList.add(StatusFormat.getStatus(r));
+      flagList.add(r);
+    }
+    proto = PipelineAckProto.newBuilder()
+      .setSeqno(seqno)
+      .addAllReply(statusList)
+      .addAllFlag(flagList)
+      .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
+      .build();
+  }
+  
+  /**
+   * Get the sequence number
+   * @return the sequence number
+   */
+  public long getSeqno() {
+    return proto.getSeqno();
+  }
+  
+  /**
+   * Get the number of replies
+   * @return the number of replies
+   */
+  public short getNumOfReplies() {
+    return (short)proto.getReplyCount();
+  }
+  
+  /**
+   * get the header flag of ith reply
+   */
+  public int getHeaderFlag(int i) {
+    if (proto.getFlagCount() > 0) {
+      return proto.getFlag(i);
+    } else {
+      return combineHeader(ECN.DISABLED, proto.getReply(i));
+    }
+  }
+
+  public int getFlag(int i) {
+    return proto.getFlag(i);
+  }
+
+  /**
+   * Get the time elapsed for downstream ack RTT in nanoseconds
+   * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
+   */
+  public long getDownstreamAckTimeNanos() {
+    return proto.getDownstreamAckTimeNanos();
+  }
+
+  /**
+   * Check if this ack contains error status
+   * @return true if all statuses are SUCCESS
+   */
+  public boolean isSuccess() {
+    for (Status s : proto.getReplyList()) {
+      if (s != Status.SUCCESS) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns the OOB status if this ack contains one. 
+   * @return null if it is not an OOB ack.
+   */
+  public Status getOOBStatus() {
+    // Normal data transfer acks will have a valid sequence number, so
+    // this will return right away in most cases.
+    if (getSeqno() != UNKOWN_SEQNO) {
+      return null;
+    }
+    for (Status s : proto.getReplyList()) {
+      // The following check is valid because protobuf guarantees to
+      // preserve the ordering of enum elements.
+      if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
+        return s;
+      }
+    }
+    return null;
+  }
+
+  /** Get the Restart OOB ack status */
+  public static Status getRestartOOBStatus() {
+    return Status.OOB_RESTART;
+  }
+
+  /** return true if it is the restart OOB status code  */
+  public static boolean isRestartOOBStatus(Status st) {
+    return st.equals(Status.OOB_RESTART);
+  }
+
+  /**** Writable interface ****/
+  public void readFields(InputStream in) throws IOException {
+    proto = PipelineAckProto.parseFrom(vintPrefixed(in));
+  }
+
+  public void write(OutputStream out) throws IOException {
+    proto.writeDelimitedTo(out);
+  }
+  
+  @Override //Object
+  public String toString() {
+    return TextFormat.shortDebugString(proto);
+  }
+
+  public static Status getStatusFromHeader(int header) {
+    return StatusFormat.getStatus(header);
+  }
+
+  public static ECN getECNFromHeader(int header) {
+    return StatusFormat.getECN(header);
+  }
+
+  public static int setStatusForHeader(int old, Status status) {
+    return StatusFormat.setStatus(old, status);
+  }
+
+  public static int combineHeader(ECN ecn, Status status) {
+    int header = 0;
+    header = StatusFormat.setStatus(header, status);
+    header = StatusFormat.setECN(header, ecn);
+    return header;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
new file mode 100644
index 0000000..9399d84
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.Serializable;
+
+
+/**
+ * Bit format in a long.
+ */
+public class LongBitFormat implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final String NAME;
+  /** Bit offset */
+  private final int OFFSET;
+  /** Bit length */
+  private final int LENGTH;
+  /** Minimum value */
+  private final long MIN;
+  /** Maximum value */
+  private final long MAX;
+  /** Bit mask */
+  private final long MASK;
+
+  public LongBitFormat(String name, LongBitFormat previous, int length, long min) {
+    NAME = name;
+    OFFSET = previous == null? 0: previous.OFFSET + previous.LENGTH;
+    LENGTH = length;
+    MIN = min;
+    MAX = ((-1L) >>> (64 - LENGTH));
+    MASK = MAX << OFFSET;
+  }
+
+  /** Retrieve the value from the record. */
+  public long retrieve(long record) {
+    return (record & MASK) >>> OFFSET;
+  }
+
+  /** Combine the value to the record. */
+  public long combine(long value, long record) {
+    if (value < MIN) {
+      throw new IllegalArgumentException(
+          "Illagal value: " + NAME + " = " + value + " < MIN = " + MIN);
+    }
+    if (value > MAX) {
+      throw new IllegalArgumentException(
+          "Illagal value: " + NAME + " = " + value + " > MAX = " + MAX);
+    }
+    return (record & ~MASK) | (value << OFFSET);
+  }
+  
+  public long getMin() {
+    return MIN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b9b89aa..e67c9d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -888,6 +888,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-9021. Use a yellow elephant rather than a blue one in diagram. (wang)
 
+    HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to
+    hadoop-hdfs-client module. (Mingliang Liu via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
deleted file mode 100644
index 44f38c6..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
-
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import com.google.protobuf.TextFormat;
-import org.apache.hadoop.hdfs.util.LongBitFormat;
-
-/** Pipeline Acknowledgment **/
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class PipelineAck {
-  PipelineAckProto proto;
-  public final static long UNKOWN_SEQNO = -2;
-  final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
-  final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
-  final static int NUM_OOB_TYPES = OOB_END - OOB_START + 1;
-  // place holder for timeout value of each OOB type
-  final static long[] OOB_TIMEOUT;
-
-  public enum ECN {
-    DISABLED(0),
-    SUPPORTED(1),
-    SUPPORTED2(2),
-    CONGESTED(3);
-
-    private final int value;
-    private static final ECN[] VALUES = values();
-    static ECN valueOf(int value) {
-      return VALUES[value];
-    }
-
-    ECN(int value) {
-      this.value = value;
-    }
-
-    public int getValue() {
-      return value;
-    }
-  }
-
-  private enum StatusFormat {
-    STATUS(null, 4),
-    RESERVED(STATUS.BITS, 1),
-    ECN_BITS(RESERVED.BITS, 2);
-
-    private final LongBitFormat BITS;
-
-    StatusFormat(LongBitFormat prev, int bits) {
-      BITS = new LongBitFormat(name(), prev, bits, 0);
-    }
-
-    static Status getStatus(int header) {
-      return Status.valueOf((int) STATUS.BITS.retrieve(header));
-    }
-
-    static ECN getECN(int header) {
-      return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
-    }
-
-    public static int setStatus(int old, Status status) {
-      return (int) STATUS.BITS.combine(status.getNumber(), old);
-    }
-
-    public static int setECN(int old, ECN ecn) {
-      return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
-    }
-  }
-
-  static {
-    OOB_TIMEOUT = new long[NUM_OOB_TYPES];
-    HdfsConfiguration conf = new HdfsConfiguration();
-    String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
-        DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
-    for (int i = 0; i < NUM_OOB_TYPES; i++) {
-      OOB_TIMEOUT[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
-    }
-  }
-
-  /** default constructor **/
-  public PipelineAck() {
-  }
-  
-  /**
-   * Constructor assuming no next DN in pipeline
-   * @param seqno sequence number
-   * @param replies an array of replies
-   */
-  public PipelineAck(long seqno, int[] replies) {
-    this(seqno, replies, 0L);
-  }
-
-  /**
-   * Constructor
-   * @param seqno sequence number
-   * @param replies an array of replies
-   * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
-   */
-  public PipelineAck(long seqno, int[] replies,
-                     long downstreamAckTimeNanos) {
-    ArrayList<Status> statusList = Lists.newArrayList();
-    ArrayList<Integer> flagList = Lists.newArrayList();
-    for (int r : replies) {
-      statusList.add(StatusFormat.getStatus(r));
-      flagList.add(r);
-    }
-    proto = PipelineAckProto.newBuilder()
-      .setSeqno(seqno)
-      .addAllReply(statusList)
-      .addAllFlag(flagList)
-      .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
-      .build();
-  }
-  
-  /**
-   * Get the sequence number
-   * @return the sequence number
-   */
-  public long getSeqno() {
-    return proto.getSeqno();
-  }
-  
-  /**
-   * Get the number of replies
-   * @return the number of replies
-   */
-  public short getNumOfReplies() {
-    return (short)proto.getReplyCount();
-  }
-  
-  /**
-   * get the header flag of ith reply
-   */
-  public int getHeaderFlag(int i) {
-    if (proto.getFlagCount() > 0) {
-      return proto.getFlag(i);
-    } else {
-      return combineHeader(ECN.DISABLED, proto.getReply(i));
-    }
-  }
-
-  public int getFlag(int i) {
-    return proto.getFlag(i);
-  }
-
-  /**
-   * Get the time elapsed for downstream ack RTT in nanoseconds
-   * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
-   */
-  public long getDownstreamAckTimeNanos() {
-    return proto.getDownstreamAckTimeNanos();
-  }
-
-  /**
-   * Check if this ack contains error status
-   * @return true if all statuses are SUCCESS
-   */
-  public boolean isSuccess() {
-    for (Status s : proto.getReplyList()) {
-      if (s != Status.SUCCESS) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Returns the OOB status if this ack contains one. 
-   * @return null if it is not an OOB ack.
-   */
-  public Status getOOBStatus() {
-    // Normal data transfer acks will have a valid sequence number, so
-    // this will return right away in most cases.
-    if (getSeqno() != UNKOWN_SEQNO) {
-      return null;
-    }
-    for (Status s : proto.getReplyList()) {
-      // The following check is valid because protobuf guarantees to
-      // preserve the ordering of enum elements.
-      if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
-        return s;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Get the timeout to be used for transmitting the OOB type
-   * @return the timeout in milliseconds
-   */
-  public static long getOOBTimeout(Status status) throws IOException {
-    int index = status.getNumber() - OOB_START;
-    if (index >= 0 && index < NUM_OOB_TYPES) {
-      return OOB_TIMEOUT[index];
-    } 
-    // Not an OOB.
-    throw new IOException("Not an OOB status: " + status);
-  }
-
-  /** Get the Restart OOB ack status */
-  public static Status getRestartOOBStatus() {
-    return Status.OOB_RESTART;
-  }
-
-  /** return true if it is the restart OOB status code  */
-  public static boolean isRestartOOBStatus(Status st) {
-    return st.equals(Status.OOB_RESTART);
-  }
-
-  /**** Writable interface ****/
-  public void readFields(InputStream in) throws IOException {
-    proto = PipelineAckProto.parseFrom(vintPrefixed(in));
-  }
-
-  public void write(OutputStream out) throws IOException {
-    proto.writeDelimitedTo(out);
-  }
-  
-  @Override //Object
-  public String toString() {
-    return TextFormat.shortDebugString(proto);
-  }
-
-  public static Status getStatusFromHeader(int header) {
-    return StatusFormat.getStatus(header);
-  }
-
-  public static ECN getECNFromHeader(int header) {
-    return StatusFormat.getECN(header);
-  }
-
-  public static int setStatusForHeader(int old, Status status) {
-    return StatusFormat.setStatus(old, status);
-  }
-
-  public static int combineHeader(ECN ecn, Status status) {
-    int header = 0;
-    header = StatusFormat.setStatus(header, status);
-    header = StatusFormat.setECN(header, ecn);
-    return header;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 1cb308f..bc5396f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -1153,7 +1153,7 @@ class BlockReceiver implements Closeable {
 
       synchronized(this) {
         if (sending) {
-          wait(PipelineAck.getOOBTimeout(ackStatus));
+          wait(datanode.getOOBTimeout(ackStatus));
           // Didn't get my turn in time. Give up.
           if (sending) {
             throw new IOException("Could not send OOB reponse in time: "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 42cbd96..e0adc6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -40,6 +40,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
@@ -359,6 +361,8 @@ public class DataNode extends ReconfigurableBase
       .availableProcessors();
   private static final double CONGESTION_RATIO = 1.5;
 
+  private long[] oobTimeouts; /** timeout value of each OOB type */
+
   /**
    * Creates a dummy DataNode for testing purpose.
    */
@@ -373,6 +377,7 @@ public class DataNode extends ReconfigurableBase
     this.connectToDnViaHostname = false;
     this.blockScanner = new BlockScanner(this, conf);
     this.pipelineSupportECN = false;
+    initOOBTimeout();
   }
 
   /**
@@ -446,6 +451,8 @@ public class DataNode extends ReconfigurableBase
                 return ret;
               }
             });
+
+    initOOBTimeout();
   }
 
   @Override  // ReconfigurableBase
@@ -3226,4 +3233,35 @@ public class DataNode extends ReconfigurableBase
     checkSuperuserPrivilege();
     spanReceiverHost.removeSpanReceiver(id);
   }
+
+  /**
+   * Get timeout value of each OOB type from configuration
+   */
+  private void initOOBTimeout() {
+    final int oobStart = Status.OOB_RESTART_VALUE; // the first OOB type
+    final int oobEnd = Status.OOB_RESERVED3_VALUE; // the last OOB type
+    final int numOobTypes = oobEnd - oobStart + 1;
+    oobTimeouts = new long[numOobTypes];
+
+    final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
+        DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
+    for (int i = 0; i < numOobTypes; i++) {
+      oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
+    }
+  }
+
+  /**
+   * Get the timeout to be used for transmitting the OOB type
+   * @return the timeout in milliseconds
+   */
+  public long getOOBTimeout(Status status)
+      throws IOException {
+    if (status.getNumber() < Status.OOB_RESTART_VALUE ||
+        status.getNumber() > Status.OOB_RESERVED3_VALUE) {
+      // Not an OOB.
+      throw new IOException("Not an OOB status: " + status);
+    }
+
+    return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE];
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
deleted file mode 100644
index 9399d84..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.util;
-
-import java.io.Serializable;
-
-
-/**
- * Bit format in a long.
- */
-public class LongBitFormat implements Serializable {
-  private static final long serialVersionUID = 1L;
-
-  private final String NAME;
-  /** Bit offset */
-  private final int OFFSET;
-  /** Bit length */
-  private final int LENGTH;
-  /** Minimum value */
-  private final long MIN;
-  /** Maximum value */
-  private final long MAX;
-  /** Bit mask */
-  private final long MASK;
-
-  public LongBitFormat(String name, LongBitFormat previous, int length, long min) {
-    NAME = name;
-    OFFSET = previous == null? 0: previous.OFFSET + previous.LENGTH;
-    LENGTH = length;
-    MIN = min;
-    MAX = ((-1L) >>> (64 - LENGTH));
-    MASK = MAX << OFFSET;
-  }
-
-  /** Retrieve the value from the record. */
-  public long retrieve(long record) {
-    return (record & MASK) >>> OFFSET;
-  }
-
-  /** Combine the value to the record. */
-  public long combine(long value, long record) {
-    if (value < MIN) {
-      throw new IllegalArgumentException(
-          "Illagal value: " + NAME + " = " + value + " < MIN = " + MIN);
-    }
-    if (value > MAX) {
-      throw new IllegalArgumentException(
-          "Illagal value: " + NAME + " = " + value + " > MAX = " + MAX);
-    }
-    return (record & ~MASK) | (value << OFFSET);
-  }
-  
-  public long getMin() {
-    return MIN;
-  }
-}