You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2020/12/01 00:10:05 UTC

[GitHub] [ozone] hanishakoneru commented on a change in pull request #1523: HDDS-4320. Let Ozone input streams implement CanUnbuffer

hanishakoneru commented on a change in pull request #1523:
URL: https://github.com/apache/ozone/pull/1523#discussion_r532801165



##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
##########
@@ -171,6 +162,23 @@ public synchronized void initialize() throws IOException {
     }
   }
 
+  private void refreshPipeline(IOException cause) throws IOException {
+    LOG.error("Unable to read information for block {} from pipeline {}: {}",
+        blockID, pipeline.getId(), cause.getMessage());

Review comment:
       This error message might be confusing if the refresh pipeline function exists. In the else case under refreshPipelineFunction != null, can we change the log level to info maybe?

##########
File path: hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
##########
@@ -292,6 +305,11 @@ private synchronized void readChunkFromContainer(int len) throws IOException {
       startByteIndex = bufferOffset + bufferLength;
     }
 
+    // bufferOffset and bufferLength are updated below, but if read fails
+    // and is retried, we need the previous position.  Position is reset after
+    // successful read in adjustBufferPosition()
+    storePosition();

Review comment:
       Can we also please update the javadoc for chunkPosition?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
+ * Note: this is from Hadoop 3.3, can be removed after dependency upgrade.
+ */
+public abstract class AbstractContractUnbufferTest
+    extends AbstractFSContractTestBase {
+
+  private Path file;
+  private byte[] fileBytes;
+
+  private static final String SUPPORTS_UNBUFFER = "supports-unbuffer";
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfUnsupported(SUPPORTS_UNBUFFER);
+    file = path("unbufferFile");
+    fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+    createFile(getFileSystem(), file, true, fileBytes);
+  }
+
+  @Test
+  public void testUnbufferAfterRead() throws IOException {
+    describe("unbuffer a file after a single read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      validateFullFileContents(stream);
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferBeforeRead() throws IOException {
+    describe("unbuffer a file before a read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      validateFullFileContents(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferEmptyFile() throws IOException {
+    Path emptyFile = path("emptyUnbufferFile");
+    getFileSystem().create(emptyFile, true).close();
+    describe("unbuffer an empty file");
+    try (FSDataInputStream stream = getFileSystem().open(emptyFile)) {
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferOnClosedFile() throws IOException {
+    describe("unbuffer a file before a read");
+    FSDataInputStream stream = null;
+    try {
+      stream = getFileSystem().open(file);
+      validateFullFileContents(stream);
+    } finally {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    if (stream != null) {
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testMultipleUnbuffers() throws IOException {
+    describe("unbuffer a file multiple times");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      unbuffer(stream);
+      validateFullFileContents(stream);
+      unbuffer(stream);
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferMultipleReads() throws IOException {
+    describe("unbuffer a file multiple times");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      validateFileContents(stream, TEST_FILE_LEN / 8, 0);
+      unbuffer(stream);
+      validateFileContents(stream, TEST_FILE_LEN / 8, TEST_FILE_LEN / 8);
+      validateFileContents(stream, TEST_FILE_LEN / 4, TEST_FILE_LEN / 4);
+      unbuffer(stream);
+      validateFileContents(stream, TEST_FILE_LEN / 2, TEST_FILE_LEN / 2);
+      unbuffer(stream);
+      assertEquals("stream should be at end of file", TEST_FILE_LEN,
+              stream.getPos());
+    }
+  }
+
+  private void unbuffer(FSDataInputStream stream) throws IOException {
+    long pos = stream.getPos();
+    stream.unbuffer();
+    assertEquals("unbuffer unexpectedly changed the stream position", pos,
+            stream.getPos());
+  }
+
+  protected void validateFullFileContents(FSDataInputStream stream)
+          throws IOException {
+    validateFileContents(stream, TEST_FILE_LEN, 0);
+  }
+
+  protected void validateFileContents(FSDataInputStream stream, int length,
+                                      int startIndex)
+          throws IOException {
+    byte[] streamData = new byte[length];
+    assertEquals("failed to read expected number of bytes from "
+            + "stream. This may be transient",
+        length, stream.read(streamData));

Review comment:
       What are the transient cases in which this might fail? 
   Should there be a retry in those cases? Otherwise these tests might fail intermittently.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
+ * Note: this is from Hadoop 3.3, can be removed after dependency upgrade.
+ */
+public abstract class AbstractContractUnbufferTest
+    extends AbstractFSContractTestBase {
+
+  private Path file;
+  private byte[] fileBytes;
+
+  private static final String SUPPORTS_UNBUFFER = "supports-unbuffer";
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfUnsupported(SUPPORTS_UNBUFFER);
+    file = path("unbufferFile");
+    fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+    createFile(getFileSystem(), file, true, fileBytes);
+  }
+
+  @Test
+  public void testUnbufferAfterRead() throws IOException {
+    describe("unbuffer a file after a single read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      validateFullFileContents(stream);
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferBeforeRead() throws IOException {
+    describe("unbuffer a file before a read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      validateFullFileContents(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferEmptyFile() throws IOException {
+    Path emptyFile = path("emptyUnbufferFile");
+    getFileSystem().create(emptyFile, true).close();
+    describe("unbuffer an empty file");
+    try (FSDataInputStream stream = getFileSystem().open(emptyFile)) {
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferOnClosedFile() throws IOException {
+    describe("unbuffer a file before a read");

Review comment:
       typo in description

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
##########
@@ -397,4 +410,118 @@ public void testSkip() throws Exception {
       Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
     }
   }
+
+  @Test
+  public void readAfterReplication() throws Exception {
+    testReadAfterReplication(false);
+  }
+
+  @Test
+  public void unbuffer() throws Exception {

Review comment:
       How about naming this method something like readAfterReplicationWithUnbuffering or something to represent what the test verifies?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
##########
@@ -397,4 +410,118 @@ public void testSkip() throws Exception {
       Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
     }
   }
+
+  @Test
+  public void readAfterReplication() throws Exception {
+    testReadAfterReplication(false);
+  }
+
+  @Test
+  public void unbuffer() throws Exception {
+    testReadAfterReplication(true);
+  }
+
+  private void testReadAfterReplication(boolean doUnbuffer) throws Exception {
+    Assume.assumeTrue(cluster.getHddsDatanodes().size() > 3);
+
+    int dataLength = 2 * chunkSize;
+    String keyName = getKeyName();
+    OzoneOutputStream key = TestHelper.createKey(keyName,
+        ReplicationType.RATIS, dataLength, objectStore, volumeName, bucketName);
+
+    byte[] data = writeRandomBytes(key, dataLength);
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+
+    OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations();
+    Assert.assertNotNull(locations);
+    List<OmKeyLocationInfo> locationInfoList = locations.getLocationList();
+    Assert.assertEquals(1, locationInfoList.size());
+    OmKeyLocationInfo loc = locationInfoList.get(0);
+    long containerID = loc.getContainerID();
+    Assert.assertEquals(3, countReplicas(containerID, cluster));
+
+    TestHelper.waitForContainerClose(cluster, containerID);
+
+    List<DatanodeDetails> pipelineNodes = loc.getPipeline().getNodes();
+
+    // read chunk data
+    try (KeyInputStream keyInputStream = (KeyInputStream) objectStore
+        .getVolume(volumeName).getBucket(bucketName)
+        .readKey(keyName).getInputStream()) {
+
+      int b = keyInputStream.read();
+      Assert.assertNotEquals(-1, b);
+
+      if (doUnbuffer) {
+        keyInputStream.unbuffer();
+      }
+
+      // stop one node, wait for container to be replicated to another one
+      cluster.shutdownHddsDatanode(pipelineNodes.get(0));
+      waitForNodeToBecomeDead(pipelineNodes.get(0));
+      waitForReplicaCount(containerID, 2, cluster);
+      waitForReplicaCount(containerID, 3, cluster);

Review comment:
       waitForReplicaCount=3 will encompass waitForReplicaCount=2.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+
+/**
+ * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
+ * Note: this is from Hadoop 3.3, can be removed after dependency upgrade.
+ */
+public abstract class AbstractContractUnbufferTest
+    extends AbstractFSContractTestBase {
+
+  private Path file;
+  private byte[] fileBytes;
+
+  private static final String SUPPORTS_UNBUFFER = "supports-unbuffer";
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfUnsupported(SUPPORTS_UNBUFFER);
+    file = path("unbufferFile");
+    fileBytes = dataset(TEST_FILE_LEN, 0, 255);
+    createFile(getFileSystem(), file, true, fileBytes);
+  }
+
+  @Test
+  public void testUnbufferAfterRead() throws IOException {
+    describe("unbuffer a file after a single read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      validateFullFileContents(stream);
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferBeforeRead() throws IOException {
+    describe("unbuffer a file before a read");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      validateFullFileContents(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferEmptyFile() throws IOException {
+    Path emptyFile = path("emptyUnbufferFile");
+    getFileSystem().create(emptyFile, true).close();
+    describe("unbuffer an empty file");
+    try (FSDataInputStream stream = getFileSystem().open(emptyFile)) {
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferOnClosedFile() throws IOException {
+    describe("unbuffer a file before a read");
+    FSDataInputStream stream = null;
+    try {
+      stream = getFileSystem().open(file);
+      validateFullFileContents(stream);
+    } finally {
+      if (stream != null) {
+        stream.close();
+      }
+    }
+    if (stream != null) {
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testMultipleUnbuffers() throws IOException {
+    describe("unbuffer a file multiple times");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      unbuffer(stream);
+      validateFullFileContents(stream);
+      unbuffer(stream);
+      unbuffer(stream);
+    }
+  }
+
+  @Test
+  public void testUnbufferMultipleReads() throws IOException {
+    describe("unbuffer a file multiple times");
+    try (FSDataInputStream stream = getFileSystem().open(file)) {
+      unbuffer(stream);
+      validateFileContents(stream, TEST_FILE_LEN / 8, 0);
+      unbuffer(stream);
+      validateFileContents(stream, TEST_FILE_LEN / 8, TEST_FILE_LEN / 8);
+      validateFileContents(stream, TEST_FILE_LEN / 4, TEST_FILE_LEN / 4);
+      unbuffer(stream);
+      validateFileContents(stream, TEST_FILE_LEN / 2, TEST_FILE_LEN / 2);
+      unbuffer(stream);
+      assertEquals("stream should be at end of file", TEST_FILE_LEN,
+              stream.getPos());
+    }
+  }
+
+  private void unbuffer(FSDataInputStream stream) throws IOException {
+    long pos = stream.getPos();
+    stream.unbuffer();
+    assertEquals("unbuffer unexpectedly changed the stream position", pos,
+            stream.getPos());

Review comment:
       unbuffer() here just checks that the position is maintained. Would it be possible to also verify that the buffers are actually released?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org