You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2017/08/01 20:34:44 UTC

hadoop git commit: HDFS-12151. Hadoop 2 clients cannot writeBlock to Hadoop 3 DataNodes.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 72eba25e0 -> 777475983


HDFS-12151. Hadoop 2 clients cannot writeBlock to Hadoop 3 DataNodes.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77747598
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77747598
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77747598

Branch: refs/heads/trunk
Commit: 7774759830f32ef97ac9f157dbb210264b7d042a
Parents: 72eba25
Author: Sean Mackrory <ma...@apache.org>
Authored: Mon Jul 17 12:12:41 2017 -0600
Committer: Sean Mackrory <ma...@apache.org>
Committed: Tue Aug 1 14:34:25 2017 -0600

----------------------------------------------------------------------
 .../hdfs/server/datanode/DataXceiver.java       |   9 +-
 .../TestDataXceiverBackwardsCompat.java         | 212 +++++++++++++++++++
 2 files changed, 219 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/77747598/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 8ffd3a4..3216a78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -811,20 +811,25 @@ class DataXceiver extends Receiver implements Runnable {
               smallBufferSize));
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
+          String targetStorageId = null;
+          if (targetStorageIds.length > 0) {
+            // Older clients may not have provided any targetStorageIds
+            targetStorageId = targetStorageIds[0];
+          }
           if (targetPinnings != null && targetPinnings.length > 0) {
             new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
                 blockToken, clientname, targets, targetStorageTypes,
                 srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
                 latestGenerationStamp, requestedChecksum, cachingStrategy,
                 allowLazyPersist, targetPinnings[0], targetPinnings,
-                targetStorageIds[0], targetStorageIds);
+                targetStorageId, targetStorageIds);
           } else {
             new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
                 blockToken, clientname, targets, targetStorageTypes,
                 srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
                 latestGenerationStamp, requestedChecksum, cachingStrategy,
                 allowLazyPersist, false, targetPinnings,
-                targetStorageIds[0], targetStorageIds);
+                targetStorageId, targetStorageIds);
           }
 
           mirrorOut.flush();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77747598/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
new file mode 100644
index 0000000..bdcbe7f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.net.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.mockito.ArgumentCaptor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Mock-based unit test to verify that DataXceiver does not fail when no
+ * storageId or targetStorageTypes are passed - as is the case in Hadoop 2.x.
+ */
+public class TestDataXceiverBackwardsCompat {
+  @Rule
+  public Timeout timeout = new Timeout(60000);
+
+  private void failWithException(String message, Exception exception) {
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    exception.printStackTrace(new PrintStream(buffer));
+    String stacktrace = buffer.toString();
+
+    fail(message + ": " + exception + "; " + exception.getMessage() + "\n" +
+        stacktrace);
+  }
+
+  /**
+   * Used for mocking DataNode. Mockito does not provide a way to mock
+   * properties (like data or saslClient) so we have to manually set up mocks
+   * of those properties inside our own class.
+   */
+  public class NullDataNode extends DataNode {
+    public NullDataNode(Configuration conf, OutputStream out, int port) throws
+        Exception {
+      super(conf);
+      data = (FsDatasetSpi<FsVolumeSpi>)mock(FsDatasetSpi.class);
+      saslClient = mock(SaslDataTransferClient.class);
+
+      IOStreamPair pair = new IOStreamPair(null, out);
+
+      doReturn(pair).when(saslClient).socketSend(
+          any(Socket.class), any(OutputStream.class), any(InputStream.class),
+          any(DataEncryptionKeyFactory.class), any(Token.class),
+          any(DatanodeID.class));
+      doReturn(mock(ReplicaHandler.class)).when(data).createTemporary(
+          any(StorageType.class), any(String.class), any(ExtendedBlock.class),
+          anyBoolean());
+
+      new Thread(new NullServer(port)).start();
+    }
+
+    @Override
+    public DatanodeRegistration getDNRegistrationForBP(String bpid)
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public Socket newSocket() throws IOException {
+      return new Socket();
+    }
+
+    /**
+     * Class for accepting incoming an incoming connection. Does not read
+     * data or repeat in any way: simply allows a single client to connect to
+     * a local URL.
+     */
+    private class NullServer implements Runnable {
+
+      private ServerSocket serverSocket;
+
+      NullServer(int port) throws IOException {
+        serverSocket = new ServerSocket(port);
+      }
+
+      @Override
+      public void run() {
+        try {
+          serverSocket.accept();
+          serverSocket.close();
+          LOG.info("Client connection accepted by NullServer");
+        } catch (Exception e) {
+          LOG.info("Exception in NullServer: " + e + "; " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testBackwardsCompat() throws Exception {
+    Peer peer = mock(Peer.class);
+    doReturn("").when(peer).getRemoteAddressString();
+    Configuration conf = new Configuration();
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    int port = ServerSocketUtil.getPort(1234, 10);
+    DataNode dataNode = new NullDataNode(conf, out, port);
+    DataXceiverServer server = new DataXceiverServer(
+        mock(PeerServer.class), conf, dataNode);
+    DataXceiver xceiver = spy(DataXceiver.create(peer, dataNode, server));
+
+    BlockReceiver mockBlockReceiver = mock(BlockReceiver.class);
+    doReturn(mock(Replica.class)).when(mockBlockReceiver).getReplica();
+
+    DatanodeInfo[] targets = {mock(DatanodeInfo.class)};
+    doReturn("localhost:" + port).when(targets[0]).getXferAddr(true);
+    doReturn("127.0.0.1:" + port).when(targets[0]).getXferAddr(false);
+    StorageType[] storageTypes = {StorageType.RAM_DISK};
+
+    doReturn(mockBlockReceiver).when(xceiver).getBlockReceiver(
+        any(ExtendedBlock.class), any(StorageType.class),
+        any(DataInputStream.class), anyString(), anyString(),
+        any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
+        anyString(), any(DatanodeInfo.class), any(DataNode.class),
+        any(DataChecksum.class), any(CachingStrategy.class),
+        ArgumentCaptor.forClass(Boolean.class).capture(),
+        anyBoolean(), any(String.class));
+
+    Token<BlockTokenIdentifier> token = (Token<BlockTokenIdentifier>)mock(
+        Token.class);
+    doReturn("".getBytes()).when(token).getIdentifier();
+    doReturn("".getBytes()).when(token).getPassword();
+    doReturn(new Text("")).when(token).getKind();
+    doReturn(new Text("")).when(token).getService();
+
+    DataChecksum checksum = mock(DataChecksum.class);
+    doReturn(DataChecksum.Type.NULL).when(checksum).getChecksumType();
+
+    DatanodeInfo datanodeInfo = mock(DatanodeInfo.class);
+    doReturn("localhost").when(datanodeInfo).getHostName();
+    doReturn("127.0.0.1").when(datanodeInfo).getIpAddr();
+    doReturn(DatanodeInfo.AdminStates.NORMAL).when(datanodeInfo)
+        .getAdminState();
+
+    Exception storedException = null;
+    try {
+      xceiver.writeBlock(
+          new ExtendedBlock("Dummy-pool", 0L),
+          StorageType.RAM_DISK,
+          token,
+          "Dummy-Client",
+          targets,
+          storageTypes,
+          datanodeInfo,
+          BlockConstructionStage.PIPELINE_SETUP_CREATE,
+          0, 0, 0, 0,
+          checksum,
+          CachingStrategy.newDefaultStrategy(),
+          false,
+          false, new boolean[0], null, new String[0]);
+    } catch (Exception e) {
+      // Not enough things have been mocked for this to complete without
+      // exceptions, but we want to make sure we can at least get as far as
+      // sending data to the server with null values for storageId and
+      // targetStorageTypes.
+      storedException = e;
+    }
+    byte[] output = out.toByteArray();
+    if (output.length == 0) {
+      if (storedException == null) {
+        failWithException("No output written, but no exception either (this " +
+            "shouldn't happen", storedException);
+      } else {
+        failWithException("Exception occurred before anything was written",
+            storedException);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org