You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/10/05 06:01:24 UTC

[ozone] branch master updated: HDDS-7235. EOFException occurs when executing TPC-DS using o3fs (#3764)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 649d713817 HDDS-7235. EOFException occurs when executing TPC-DS using o3fs (#3764)
649d713817 is described below

commit 649d7138177c5547a6d0c8cf4610a4e359ff0436
Author: mingchao zhao <ca...@apache.org>
AuthorDate: Wed Oct 5 14:01:18 2022 +0800

    HDDS-7235. EOFException occurs when executing TPC-DS using o3fs (#3764)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   2 +-
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |   2 +-
 .../hadoop/ozone/client/io/ECBlockInputStream.java |   2 +-
 .../client/io/ECBlockReconstructedInputStream.java |   2 +-
 .../hdds/scm/storage/TestChunkInputStream.java     |   9 ++-
 .../ozone/client/io/TestECBlockInputStream.java    |   3 +-
 .../hadoop/fs/ozone/TestOzoneFSInputStream.java    |  80 ++++++++++++++++++++-
 .../java/org/apache/hadoop/ozone/TestDataUtil.java |  10 +++
 .../src/test/resources/testSequenceFile            | Bin 0 -> 96 bytes
 9 files changed, 98 insertions(+), 12 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index a0a210dd58..a6af1ba3f7 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -367,7 +367,7 @@ public class BlockInputStream extends BlockExtendedInputStream {
     }
 
     checkOpen();
-    if (pos < 0 || pos >= length) {
+    if (pos < 0 || pos > length) {
       if (pos == 0) {
         // It is possible for length and pos to be zero in which case
         // seek should return instead of throwing exception
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 2a22c2293f..b3d3a7e389 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -236,7 +236,7 @@ public class ChunkInputStream extends InputStream
    */
   @Override
   public synchronized void seek(long pos) throws IOException {
-    if (pos < 0 || pos >= length) {
+    if (pos < 0 || pos > length) {
       if (pos == 0) {
         // It is possible for length and pos to be zero in which case
         // seek should return instead of throwing exception
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 40d454a0a1..5734d4dc4b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -420,7 +420,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
   @Override
   public synchronized void seek(long pos) throws IOException {
     checkOpen();
-    if (pos < 0 || pos >= getLength()) {
+    if (pos < 0 || pos > getLength()) {
       if (pos == 0) {
         // It is possible for length and pos to be zero in which case
         // seek should return instead of throwing exception
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
index fe93b2e3cb..96aaa36920 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -173,7 +173,7 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
   @Override
   public synchronized void seek(long pos) throws IOException {
     ensureNotClosed();
-    if (pos < 0 || pos >= getLength()) {
+    if (pos < 0 || pos > getLength()) {
       if (pos == 0) {
         // It is possible for length and pos to be zero in which case
         // seek should return instead of throwing exception
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
index b2a03c5cd9..176f92194f 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.ozone.test.GenericTestUtils;
 
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.junit.Before;
@@ -148,13 +148,12 @@ public class TestChunkInputStream {
     seekAndVerify(0);
 
     try {
-      seekAndVerify(CHUNK_SIZE);
-      Assert.fail("Seeking to Chunk Length should fail.");
+      seekAndVerify(CHUNK_SIZE + 1);
+      Assert.fail("Seeking to more than the length of Chunk should fail.");
     } catch (EOFException e) {
       GenericTestUtils.assertExceptionContains("EOF encountered at pos: "
-          + CHUNK_SIZE + " for chunk: " + CHUNK_NAME, e);
+          + (CHUNK_SIZE + 1) + " for chunk: " + CHUNK_NAME, e);
     }
-
     // Seek before read should update the ChunkInputStream#chunkPosition
     seekAndVerify(25);
     Assert.assertEquals(25, chunkStream.getChunkPosition());
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index 8cf8451775..5197517925 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -326,7 +326,8 @@ public class TestECBlockInputStream {
         ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
-      assertThrows(EOFException.class, () -> ecb.seek(100));
+      // When seek more than the length, should throw EOFException.
+      assertThrows(EOFException.class, () -> ecb.seek(101));
     }
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java
index 3cc3334fd8..df699aff6e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java
@@ -18,19 +18,31 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 
+import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.UUID;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 
 import org.apache.commons.lang3.RandomStringUtils;
@@ -57,8 +69,10 @@ public class TestOzoneFSInputStream {
   public Timeout timeout = Timeout.seconds(300);
   private static MiniOzoneCluster cluster = null;
   private static FileSystem fs;
+  private static FileSystem ecFs;
   private static Path filePath = null;
   private static byte[] data = null;
+  private static OzoneConfiguration conf = null;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -69,11 +83,11 @@ public class TestOzoneFSInputStream {
    */
   @BeforeClass
   public static void init() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
+    conf = new OzoneConfiguration();
     conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
         BucketLayout.LEGACY.name());
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(3)
+        .setNumDatanodes(5)
         .setChunkSize(2) // MB
         .setBlockSize(8) // MB
         .setStreamBufferFlushSize(2) // MB
@@ -95,6 +109,23 @@ public class TestOzoneFSInputStream {
     try (FSDataOutputStream stream = fs.create(filePath)) {
       stream.write(data);
     }
+
+    // create EC bucket to be used by OzoneFileSystem
+    BucketArgs.Builder builder = BucketArgs.newBuilder();
+    builder.setStorageType(StorageType.DISK);
+    builder.setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    builder.setDefaultReplicationConfig(
+        new DefaultReplicationConfig(ReplicationType.EC,
+            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+                1024)));
+    BucketArgs omBucketArgs = builder.build();
+    String ecBucket = UUID.randomUUID().toString();
+    TestDataUtil.createBucket(cluster, bucket.getVolumeName(), omBucketArgs,
+        ecBucket);
+    String ecUri = String.format("%s://%s.%s/",
+        OzoneConsts.OZONE_URI_SCHEME, ecBucket, bucket.getVolumeName());
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, ecUri);
+    ecFs =  FileSystem.get(conf);
   }
 
   /**
@@ -103,6 +134,7 @@ public class TestOzoneFSInputStream {
   @AfterClass
   public static void shutdown() throws IOException {
     fs.close();
+    ecFs.close();
     cluster.shutdown();
   }
 
@@ -159,4 +191,48 @@ public class TestOzoneFSInputStream {
       Assert.assertArrayEquals(value, buffer.array());
     }
   }
+
+  @Test
+  public void testSequenceFileReaderSync() throws IOException {
+    File srcfile = new File("src/test/resources/testSequenceFile");
+    Path path = new Path("/" + RandomStringUtils.randomAlphanumeric(5));
+    InputStream input = new BufferedInputStream(new FileInputStream(srcfile));
+
+    // Upload test SequenceFile file
+    FSDataOutputStream output = fs.create(path);
+    IOUtils.copyBytes(input, output, 4096, true);
+    input.close();
+
+    // Start SequenceFile.Reader test
+    SequenceFile.Reader in = new SequenceFile.Reader(fs, path, conf);
+    long blockStart = -1;
+    // EOFException should not occur.
+    in.sync(0);
+    blockStart = in.getPosition();
+    // The behavior should be consistent with HDFS
+    Assert.assertEquals(srcfile.length(), blockStart);
+    in.close();
+  }
+
+  @Test
+  public void testSequenceFileReaderSyncEC() throws IOException {
+    File srcfile = new File("src/test/resources/testSequenceFile");
+    Path path = new Path("/" + RandomStringUtils.randomAlphanumeric(5));
+    InputStream input = new BufferedInputStream(new FileInputStream(srcfile));
+
+    // Upload test SequenceFile file
+    FSDataOutputStream output = ecFs.create(path);
+    IOUtils.copyBytes(input, output, 4096, true);
+    input.close();
+
+    // Start SequenceFile.Reader test
+    SequenceFile.Reader in = new SequenceFile.Reader(ecFs, path, conf);
+    long blockStart = -1;
+    // EOFException should not occur.
+    in.sync(0);
+    blockStart = in.getPosition();
+    // The behavior should be consistent with HDFS
+    Assert.assertEquals(srcfile.length(), blockStart);
+    in.close();
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index 1e7a8b427e..438dbe9b2f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -127,6 +127,16 @@ public final class TestDataUtil {
     return createVolumeAndBucket(cluster, BucketLayout.LEGACY);
   }
 
+  public static OzoneBucket createBucket(MiniOzoneCluster cluster,
+      String vol, BucketArgs bucketArgs, String bukName)
+      throws IOException {
+    OzoneClient client = cluster.getClient();
+    ObjectStore objectStore = client.getObjectStore();
+    OzoneVolume volume = objectStore.getVolume(vol);
+    volume.createBucket(bukName, bucketArgs);
+    return volume.getBucket(bukName);
+  }
+
   public static OzoneBucket createVolumeAndBucket(MiniOzoneCluster cluster,
       BucketLayout bucketLayout) throws IOException {
     final int attempts = 5;
diff --git a/hadoop-ozone/integration-test/src/test/resources/testSequenceFile b/hadoop-ozone/integration-test/src/test/resources/testSequenceFile
new file mode 100644
index 0000000000..b35cba8cb9
Binary files /dev/null and b/hadoop-ozone/integration-test/src/test/resources/testSequenceFile differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org