You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/04 23:42:47 UTC
[42/50] [abbrv] hadoop git commit: HDFS-9012. Move
o.a.h.hdfs.protocol.datatransfer.PipelineAck class to hadoop-hdfs-client
module. Contributed by Mingliang Liu.
HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to hadoop-hdfs-client module. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d16c4eee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d16c4eee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d16c4eee
Branch: refs/heads/YARN-1197
Commit: d16c4eee186492608ffeb1c2e83f437000cc64f6
Parents: 6eaca2e
Author: Haohui Mai <wh...@apache.org>
Authored: Fri Sep 4 10:41:09 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Fri Sep 4 10:41:09 2015 -0700
----------------------------------------------------------------------
.../hdfs/protocol/datatransfer/PipelineAck.java | 243 ++++++++++++++++
.../apache/hadoop/hdfs/util/LongBitFormat.java | 71 +++++
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/protocol/datatransfer/PipelineAck.java | 274 -------------------
.../hdfs/server/datanode/BlockReceiver.java | 2 +-
.../hadoop/hdfs/server/datanode/DataNode.java | 38 +++
.../apache/hadoop/hdfs/util/LongBitFormat.java | 71 -----
7 files changed, 356 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
new file mode 100644
index 0000000..3836606
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.hdfs.util.LongBitFormat;
+
+/** Pipeline Acknowledgment **/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PipelineAck {
+ PipelineAckProto proto;
+ public final static long UNKOWN_SEQNO = -2;
+ final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
+ final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
+
+ public enum ECN {
+ DISABLED(0),
+ SUPPORTED(1),
+ SUPPORTED2(2),
+ CONGESTED(3);
+
+ private final int value;
+ private static final ECN[] VALUES = values();
+ static ECN valueOf(int value) {
+ return VALUES[value];
+ }
+
+ ECN(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+ }
+
+ private enum StatusFormat {
+ STATUS(null, 4),
+ RESERVED(STATUS.BITS, 1),
+ ECN_BITS(RESERVED.BITS, 2);
+
+ private final LongBitFormat BITS;
+
+ StatusFormat(LongBitFormat prev, int bits) {
+ BITS = new LongBitFormat(name(), prev, bits, 0);
+ }
+
+ static Status getStatus(int header) {
+ return Status.valueOf((int) STATUS.BITS.retrieve(header));
+ }
+
+ static ECN getECN(int header) {
+ return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
+ }
+
+ public static int setStatus(int old, Status status) {
+ return (int) STATUS.BITS.combine(status.getNumber(), old);
+ }
+
+ public static int setECN(int old, ECN ecn) {
+ return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
+ }
+ }
+
+ /** default constructor **/
+ public PipelineAck() {
+ }
+
+ /**
+ * Constructor assuming no next DN in pipeline
+ * @param seqno sequence number
+ * @param replies an array of replies
+ */
+ public PipelineAck(long seqno, int[] replies) {
+ this(seqno, replies, 0L);
+ }
+
+ /**
+ * Constructor
+ * @param seqno sequence number
+ * @param replies an array of replies
+ * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
+ */
+ public PipelineAck(long seqno, int[] replies,
+ long downstreamAckTimeNanos) {
+ ArrayList<Status> statusList = Lists.newArrayList();
+ ArrayList<Integer> flagList = Lists.newArrayList();
+ for (int r : replies) {
+ statusList.add(StatusFormat.getStatus(r));
+ flagList.add(r);
+ }
+ proto = PipelineAckProto.newBuilder()
+ .setSeqno(seqno)
+ .addAllReply(statusList)
+ .addAllFlag(flagList)
+ .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
+ .build();
+ }
+
+ /**
+ * Get the sequence number
+ * @return the sequence number
+ */
+ public long getSeqno() {
+ return proto.getSeqno();
+ }
+
+ /**
+ * Get the number of replies
+ * @return the number of replies
+ */
+ public short getNumOfReplies() {
+ return (short)proto.getReplyCount();
+ }
+
+ /**
+ * get the header flag of ith reply
+ */
+ public int getHeaderFlag(int i) {
+ if (proto.getFlagCount() > 0) {
+ return proto.getFlag(i);
+ } else {
+ return combineHeader(ECN.DISABLED, proto.getReply(i));
+ }
+ }
+
+ public int getFlag(int i) {
+ return proto.getFlag(i);
+ }
+
+ /**
+ * Get the time elapsed for downstream ack RTT in nanoseconds
+ * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
+ */
+ public long getDownstreamAckTimeNanos() {
+ return proto.getDownstreamAckTimeNanos();
+ }
+
+ /**
+ * Check if this ack contains error status
+ * @return true if all statuses are SUCCESS
+ */
+ public boolean isSuccess() {
+ for (Status s : proto.getReplyList()) {
+ if (s != Status.SUCCESS) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns the OOB status if this ack contains one.
+ * @return null if it is not an OOB ack.
+ */
+ public Status getOOBStatus() {
+ // Normal data transfer acks will have a valid sequence number, so
+ // this will return right away in most cases.
+ if (getSeqno() != UNKOWN_SEQNO) {
+ return null;
+ }
+ for (Status s : proto.getReplyList()) {
+ // The following check is valid because protobuf guarantees to
+ // preserve the ordering of enum elements.
+ if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
+ return s;
+ }
+ }
+ return null;
+ }
+
+ /** Get the Restart OOB ack status */
+ public static Status getRestartOOBStatus() {
+ return Status.OOB_RESTART;
+ }
+
+ /** return true if it is the restart OOB status code */
+ public static boolean isRestartOOBStatus(Status st) {
+ return st.equals(Status.OOB_RESTART);
+ }
+
+ /**** Writable interface ****/
+ public void readFields(InputStream in) throws IOException {
+ proto = PipelineAckProto.parseFrom(vintPrefixed(in));
+ }
+
+ public void write(OutputStream out) throws IOException {
+ proto.writeDelimitedTo(out);
+ }
+
+ @Override //Object
+ public String toString() {
+ return TextFormat.shortDebugString(proto);
+ }
+
+ public static Status getStatusFromHeader(int header) {
+ return StatusFormat.getStatus(header);
+ }
+
+ public static ECN getECNFromHeader(int header) {
+ return StatusFormat.getECN(header);
+ }
+
+ public static int setStatusForHeader(int old, Status status) {
+ return StatusFormat.setStatus(old, status);
+ }
+
+ public static int combineHeader(ECN ecn, Status status) {
+ int header = 0;
+ header = StatusFormat.setStatus(header, status);
+ header = StatusFormat.setECN(header, ecn);
+ return header;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
new file mode 100644
index 0000000..9399d84
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.Serializable;
+
+
+/**
+ * Bit format in a long.
+ */
+public class LongBitFormat implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String NAME;
+ /** Bit offset */
+ private final int OFFSET;
+ /** Bit length */
+ private final int LENGTH;
+ /** Minimum value */
+ private final long MIN;
+ /** Maximum value */
+ private final long MAX;
+ /** Bit mask */
+ private final long MASK;
+
+ public LongBitFormat(String name, LongBitFormat previous, int length, long min) {
+ NAME = name;
+ OFFSET = previous == null? 0: previous.OFFSET + previous.LENGTH;
+ LENGTH = length;
+ MIN = min;
+ MAX = ((-1L) >>> (64 - LENGTH));
+ MASK = MAX << OFFSET;
+ }
+
+ /** Retrieve the value from the record. */
+ public long retrieve(long record) {
+ return (record & MASK) >>> OFFSET;
+ }
+
+ /** Combine the value to the record. */
+ public long combine(long value, long record) {
+ if (value < MIN) {
+ throw new IllegalArgumentException(
+ "Illagal value: " + NAME + " = " + value + " < MIN = " + MIN);
+ }
+ if (value > MAX) {
+ throw new IllegalArgumentException(
+ "Illagal value: " + NAME + " = " + value + " > MAX = " + MAX);
+ }
+ return (record & ~MASK) | (value << OFFSET);
+ }
+
+ public long getMin() {
+ return MIN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b9b89aa..e67c9d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -888,6 +888,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9021. Use a yellow elephant rather than a blue one in diagram. (wang)
+ HDFS-9012. Move o.a.h.hdfs.protocol.datatransfer.PipelineAck class to
+ hadoop-hdfs-client module. (Mingliang Liu via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
deleted file mode 100644
index 44f38c6..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
-
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import com.google.protobuf.TextFormat;
-import org.apache.hadoop.hdfs.util.LongBitFormat;
-
-/** Pipeline Acknowledgment **/
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class PipelineAck {
- PipelineAckProto proto;
- public final static long UNKOWN_SEQNO = -2;
- final static int OOB_START = Status.OOB_RESTART_VALUE; // the first OOB type
- final static int OOB_END = Status.OOB_RESERVED3_VALUE; // the last OOB type
- final static int NUM_OOB_TYPES = OOB_END - OOB_START + 1;
- // place holder for timeout value of each OOB type
- final static long[] OOB_TIMEOUT;
-
- public enum ECN {
- DISABLED(0),
- SUPPORTED(1),
- SUPPORTED2(2),
- CONGESTED(3);
-
- private final int value;
- private static final ECN[] VALUES = values();
- static ECN valueOf(int value) {
- return VALUES[value];
- }
-
- ECN(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
- }
-
- private enum StatusFormat {
- STATUS(null, 4),
- RESERVED(STATUS.BITS, 1),
- ECN_BITS(RESERVED.BITS, 2);
-
- private final LongBitFormat BITS;
-
- StatusFormat(LongBitFormat prev, int bits) {
- BITS = new LongBitFormat(name(), prev, bits, 0);
- }
-
- static Status getStatus(int header) {
- return Status.valueOf((int) STATUS.BITS.retrieve(header));
- }
-
- static ECN getECN(int header) {
- return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
- }
-
- public static int setStatus(int old, Status status) {
- return (int) STATUS.BITS.combine(status.getNumber(), old);
- }
-
- public static int setECN(int old, ECN ecn) {
- return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
- }
- }
-
- static {
- OOB_TIMEOUT = new long[NUM_OOB_TYPES];
- HdfsConfiguration conf = new HdfsConfiguration();
- String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
- DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
- for (int i = 0; i < NUM_OOB_TYPES; i++) {
- OOB_TIMEOUT[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
- }
- }
-
- /** default constructor **/
- public PipelineAck() {
- }
-
- /**
- * Constructor assuming no next DN in pipeline
- * @param seqno sequence number
- * @param replies an array of replies
- */
- public PipelineAck(long seqno, int[] replies) {
- this(seqno, replies, 0L);
- }
-
- /**
- * Constructor
- * @param seqno sequence number
- * @param replies an array of replies
- * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
- */
- public PipelineAck(long seqno, int[] replies,
- long downstreamAckTimeNanos) {
- ArrayList<Status> statusList = Lists.newArrayList();
- ArrayList<Integer> flagList = Lists.newArrayList();
- for (int r : replies) {
- statusList.add(StatusFormat.getStatus(r));
- flagList.add(r);
- }
- proto = PipelineAckProto.newBuilder()
- .setSeqno(seqno)
- .addAllReply(statusList)
- .addAllFlag(flagList)
- .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
- .build();
- }
-
- /**
- * Get the sequence number
- * @return the sequence number
- */
- public long getSeqno() {
- return proto.getSeqno();
- }
-
- /**
- * Get the number of replies
- * @return the number of replies
- */
- public short getNumOfReplies() {
- return (short)proto.getReplyCount();
- }
-
- /**
- * get the header flag of ith reply
- */
- public int getHeaderFlag(int i) {
- if (proto.getFlagCount() > 0) {
- return proto.getFlag(i);
- } else {
- return combineHeader(ECN.DISABLED, proto.getReply(i));
- }
- }
-
- public int getFlag(int i) {
- return proto.getFlag(i);
- }
-
- /**
- * Get the time elapsed for downstream ack RTT in nanoseconds
- * @return time elapsed for downstream ack in nanoseconds, 0 if no next DN in pipeline
- */
- public long getDownstreamAckTimeNanos() {
- return proto.getDownstreamAckTimeNanos();
- }
-
- /**
- * Check if this ack contains error status
- * @return true if all statuses are SUCCESS
- */
- public boolean isSuccess() {
- for (Status s : proto.getReplyList()) {
- if (s != Status.SUCCESS) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Returns the OOB status if this ack contains one.
- * @return null if it is not an OOB ack.
- */
- public Status getOOBStatus() {
- // Normal data transfer acks will have a valid sequence number, so
- // this will return right away in most cases.
- if (getSeqno() != UNKOWN_SEQNO) {
- return null;
- }
- for (Status s : proto.getReplyList()) {
- // The following check is valid because protobuf guarantees to
- // preserve the ordering of enum elements.
- if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
- return s;
- }
- }
- return null;
- }
-
- /**
- * Get the timeout to be used for transmitting the OOB type
- * @return the timeout in milliseconds
- */
- public static long getOOBTimeout(Status status) throws IOException {
- int index = status.getNumber() - OOB_START;
- if (index >= 0 && index < NUM_OOB_TYPES) {
- return OOB_TIMEOUT[index];
- }
- // Not an OOB.
- throw new IOException("Not an OOB status: " + status);
- }
-
- /** Get the Restart OOB ack status */
- public static Status getRestartOOBStatus() {
- return Status.OOB_RESTART;
- }
-
- /** return true if it is the restart OOB status code */
- public static boolean isRestartOOBStatus(Status st) {
- return st.equals(Status.OOB_RESTART);
- }
-
- /**** Writable interface ****/
- public void readFields(InputStream in) throws IOException {
- proto = PipelineAckProto.parseFrom(vintPrefixed(in));
- }
-
- public void write(OutputStream out) throws IOException {
- proto.writeDelimitedTo(out);
- }
-
- @Override //Object
- public String toString() {
- return TextFormat.shortDebugString(proto);
- }
-
- public static Status getStatusFromHeader(int header) {
- return StatusFormat.getStatus(header);
- }
-
- public static ECN getECNFromHeader(int header) {
- return StatusFormat.getECN(header);
- }
-
- public static int setStatusForHeader(int old, Status status) {
- return StatusFormat.setStatus(old, status);
- }
-
- public static int combineHeader(ECN ecn, Status status) {
- int header = 0;
- header = StatusFormat.setStatus(header, status);
- header = StatusFormat.setECN(header, ecn);
- return header;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 1cb308f..bc5396f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -1153,7 +1153,7 @@ class BlockReceiver implements Closeable {
synchronized(this) {
if (sending) {
- wait(PipelineAck.getOOBTimeout(ackStatus));
+ wait(datanode.getOOBTimeout(ackStatus));
// Didn't get my turn in time. Give up.
if (sending) {
throw new IOException("Could not send OOB reponse in time: "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 42cbd96..e0adc6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -40,6 +40,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
@@ -359,6 +361,8 @@ public class DataNode extends ReconfigurableBase
.availableProcessors();
private static final double CONGESTION_RATIO = 1.5;
+ private long[] oobTimeouts; /** timeout value of each OOB type */
+
/**
* Creates a dummy DataNode for testing purpose.
*/
@@ -373,6 +377,7 @@ public class DataNode extends ReconfigurableBase
this.connectToDnViaHostname = false;
this.blockScanner = new BlockScanner(this, conf);
this.pipelineSupportECN = false;
+ initOOBTimeout();
}
/**
@@ -446,6 +451,8 @@ public class DataNode extends ReconfigurableBase
return ret;
}
});
+
+ initOOBTimeout();
}
@Override // ReconfigurableBase
@@ -3226,4 +3233,35 @@ public class DataNode extends ReconfigurableBase
checkSuperuserPrivilege();
spanReceiverHost.removeSpanReceiver(id);
}
+
+ /**
+ * Get timeout value of each OOB type from configuration
+ */
+ private void initOOBTimeout() {
+ final int oobStart = Status.OOB_RESTART_VALUE; // the first OOB type
+ final int oobEnd = Status.OOB_RESERVED3_VALUE; // the last OOB type
+ final int numOobTypes = oobEnd - oobStart + 1;
+ oobTimeouts = new long[numOobTypes];
+
+ final String[] ele = conf.get(DFS_DATANODE_OOB_TIMEOUT_KEY,
+ DFS_DATANODE_OOB_TIMEOUT_DEFAULT).split(",");
+ for (int i = 0; i < numOobTypes; i++) {
+ oobTimeouts[i] = (i < ele.length) ? Long.parseLong(ele[i]) : 0;
+ }
+ }
+
+ /**
+ * Get the timeout to be used for transmitting the OOB type
+ * @return the timeout in milliseconds
+ */
+ public long getOOBTimeout(Status status)
+ throws IOException {
+ if (status.getNumber() < Status.OOB_RESTART_VALUE ||
+ status.getNumber() > Status.OOB_RESERVED3_VALUE) {
+ // Not an OOB.
+ throw new IOException("Not an OOB status: " + status);
+ }
+
+ return oobTimeouts[status.getNumber() - Status.OOB_RESTART_VALUE];
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d16c4eee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
deleted file mode 100644
index 9399d84..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LongBitFormat.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.util;
-
-import java.io.Serializable;
-
-
-/**
- * Bit format in a long.
- */
-public class LongBitFormat implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final String NAME;
- /** Bit offset */
- private final int OFFSET;
- /** Bit length */
- private final int LENGTH;
- /** Minimum value */
- private final long MIN;
- /** Maximum value */
- private final long MAX;
- /** Bit mask */
- private final long MASK;
-
- public LongBitFormat(String name, LongBitFormat previous, int length, long min) {
- NAME = name;
- OFFSET = previous == null? 0: previous.OFFSET + previous.LENGTH;
- LENGTH = length;
- MIN = min;
- MAX = ((-1L) >>> (64 - LENGTH));
- MASK = MAX << OFFSET;
- }
-
- /** Retrieve the value from the record. */
- public long retrieve(long record) {
- return (record & MASK) >>> OFFSET;
- }
-
- /** Combine the value to the record. */
- public long combine(long value, long record) {
- if (value < MIN) {
- throw new IllegalArgumentException(
- "Illagal value: " + NAME + " = " + value + " < MIN = " + MIN);
- }
- if (value > MAX) {
- throw new IllegalArgumentException(
- "Illagal value: " + NAME + " = " + value + " > MAX = " + MAX);
- }
- return (record & ~MASK) | (value << OFFSET);
- }
-
- public long getMin() {
- return MIN;
- }
-}