You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/10/05 06:01:24 UTC
[ozone] branch master updated: HDDS-7235. EOFException occurs when executing TPC-DS using o3fs (#3764)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 649d713817 HDDS-7235. EOFException occurs when executing TPC-DS using o3fs (#3764)
649d713817 is described below
commit 649d7138177c5547a6d0c8cf4610a4e359ff0436
Author: mingchao zhao <ca...@apache.org>
AuthorDate: Wed Oct 5 14:01:18 2022 +0800
HDDS-7235. EOFException occurs when executing TPC-DS using o3fs (#3764)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 2 +-
.../hadoop/hdds/scm/storage/ChunkInputStream.java | 2 +-
.../hadoop/ozone/client/io/ECBlockInputStream.java | 2 +-
.../client/io/ECBlockReconstructedInputStream.java | 2 +-
.../hdds/scm/storage/TestChunkInputStream.java | 9 ++-
.../ozone/client/io/TestECBlockInputStream.java | 3 +-
.../hadoop/fs/ozone/TestOzoneFSInputStream.java | 80 ++++++++++++++++++++-
.../java/org/apache/hadoop/ozone/TestDataUtil.java | 10 +++
.../src/test/resources/testSequenceFile | Bin 0 -> 96 bytes
9 files changed, 98 insertions(+), 12 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index a0a210dd58..a6af1ba3f7 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -367,7 +367,7 @@ public class BlockInputStream extends BlockExtendedInputStream {
}
checkOpen();
- if (pos < 0 || pos >= length) {
+ if (pos < 0 || pos > length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 2a22c2293f..b3d3a7e389 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -236,7 +236,7 @@ public class ChunkInputStream extends InputStream
*/
@Override
public synchronized void seek(long pos) throws IOException {
- if (pos < 0 || pos >= length) {
+ if (pos < 0 || pos > length) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 40d454a0a1..5734d4dc4b 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -420,7 +420,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
@Override
public synchronized void seek(long pos) throws IOException {
checkOpen();
- if (pos < 0 || pos >= getLength()) {
+ if (pos < 0 || pos > getLength()) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
index fe93b2e3cb..96aaa36920 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -173,7 +173,7 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
@Override
public synchronized void seek(long pos) throws IOException {
ensureNotClosed();
- if (pos < 0 || pos >= getLength()) {
+ if (pos < 0 || pos > getLength()) {
if (pos == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
index b2a03c5cd9..176f92194f 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Before;
@@ -148,13 +148,12 @@ public class TestChunkInputStream {
seekAndVerify(0);
try {
- seekAndVerify(CHUNK_SIZE);
- Assert.fail("Seeking to Chunk Length should fail.");
+ seekAndVerify(CHUNK_SIZE + 1);
+ Assert.fail("Seeking to more than the length of Chunk should fail.");
} catch (EOFException e) {
GenericTestUtils.assertExceptionContains("EOF encountered at pos: "
- + CHUNK_SIZE + " for chunk: " + CHUNK_NAME, e);
+ + (CHUNK_SIZE + 1) + " for chunk: " + CHUNK_NAME, e);
}
-
// Seek before read should update the ChunkInputStream#chunkPosition
seekAndVerify(25);
Assert.assertEquals(25, chunkStream.getChunkPosition());
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index 8cf8451775..5197517925 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -326,7 +326,8 @@ public class TestECBlockInputStream {
ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
- assertThrows(EOFException.class, () -> ecb.seek(100));
+ // When seek more than the length, should throw EOFException.
+ assertThrows(EOFException.class, () -> ecb.seek(101));
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java
index 3cc3334fd8..df699aff6e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFSInputStream.java
@@ -18,19 +18,31 @@
package org.apache.hadoop.fs.ozone;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.UUID;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.StorageType;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.commons.lang3.RandomStringUtils;
@@ -57,8 +69,10 @@ public class TestOzoneFSInputStream {
public Timeout timeout = Timeout.seconds(300);
private static MiniOzoneCluster cluster = null;
private static FileSystem fs;
+ private static FileSystem ecFs;
private static Path filePath = null;
private static byte[] data = null;
+ private static OzoneConfiguration conf = null;
/**
* Create a MiniDFSCluster for testing.
@@ -69,11 +83,11 @@ public class TestOzoneFSInputStream {
*/
@BeforeClass
public static void init() throws Exception {
- OzoneConfiguration conf = new OzoneConfiguration();
+ conf = new OzoneConfiguration();
conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
BucketLayout.LEGACY.name());
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(3)
+ .setNumDatanodes(5)
.setChunkSize(2) // MB
.setBlockSize(8) // MB
.setStreamBufferFlushSize(2) // MB
@@ -95,6 +109,23 @@ public class TestOzoneFSInputStream {
try (FSDataOutputStream stream = fs.create(filePath)) {
stream.write(data);
}
+
+ // create EC bucket to be used by OzoneFileSystem
+ BucketArgs.Builder builder = BucketArgs.newBuilder();
+ builder.setStorageType(StorageType.DISK);
+ builder.setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ builder.setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ 1024)));
+ BucketArgs omBucketArgs = builder.build();
+ String ecBucket = UUID.randomUUID().toString();
+ TestDataUtil.createBucket(cluster, bucket.getVolumeName(), omBucketArgs,
+ ecBucket);
+ String ecUri = String.format("%s://%s.%s/",
+ OzoneConsts.OZONE_URI_SCHEME, ecBucket, bucket.getVolumeName());
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, ecUri);
+ ecFs = FileSystem.get(conf);
}
/**
@@ -103,6 +134,7 @@ public class TestOzoneFSInputStream {
@AfterClass
public static void shutdown() throws IOException {
fs.close();
+ ecFs.close();
cluster.shutdown();
}
@@ -159,4 +191,48 @@ public class TestOzoneFSInputStream {
Assert.assertArrayEquals(value, buffer.array());
}
}
+
+ @Test
+ public void testSequenceFileReaderSync() throws IOException {
+ File srcfile = new File("src/test/resources/testSequenceFile");
+ Path path = new Path("/" + RandomStringUtils.randomAlphanumeric(5));
+ InputStream input = new BufferedInputStream(new FileInputStream(srcfile));
+
+ // Upload test SequenceFile file
+ FSDataOutputStream output = fs.create(path);
+ IOUtils.copyBytes(input, output, 4096, true);
+ input.close();
+
+ // Start SequenceFile.Reader test
+ SequenceFile.Reader in = new SequenceFile.Reader(fs, path, conf);
+ long blockStart = -1;
+ // EOFException should not occur.
+ in.sync(0);
+ blockStart = in.getPosition();
+ // The behavior should be consistent with HDFS
+ Assert.assertEquals(srcfile.length(), blockStart);
+ in.close();
+ }
+
+ @Test
+ public void testSequenceFileReaderSyncEC() throws IOException {
+ File srcfile = new File("src/test/resources/testSequenceFile");
+ Path path = new Path("/" + RandomStringUtils.randomAlphanumeric(5));
+ InputStream input = new BufferedInputStream(new FileInputStream(srcfile));
+
+ // Upload test SequenceFile file
+ FSDataOutputStream output = ecFs.create(path);
+ IOUtils.copyBytes(input, output, 4096, true);
+ input.close();
+
+ // Start SequenceFile.Reader test
+ SequenceFile.Reader in = new SequenceFile.Reader(ecFs, path, conf);
+ long blockStart = -1;
+ // EOFException should not occur.
+ in.sync(0);
+ blockStart = in.getPosition();
+ // The behavior should be consistent with HDFS
+ Assert.assertEquals(srcfile.length(), blockStart);
+ in.close();
+ }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
index 1e7a8b427e..438dbe9b2f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDataUtil.java
@@ -127,6 +127,16 @@ public final class TestDataUtil {
return createVolumeAndBucket(cluster, BucketLayout.LEGACY);
}
+ public static OzoneBucket createBucket(MiniOzoneCluster cluster,
+ String vol, BucketArgs bucketArgs, String bukName)
+ throws IOException {
+ OzoneClient client = cluster.getClient();
+ ObjectStore objectStore = client.getObjectStore();
+ OzoneVolume volume = objectStore.getVolume(vol);
+ volume.createBucket(bukName, bucketArgs);
+ return volume.getBucket(bukName);
+ }
+
public static OzoneBucket createVolumeAndBucket(MiniOzoneCluster cluster,
BucketLayout bucketLayout) throws IOException {
final int attempts = 5;
diff --git a/hadoop-ozone/integration-test/src/test/resources/testSequenceFile b/hadoop-ozone/integration-test/src/test/resources/testSequenceFile
new file mode 100644
index 0000000000..b35cba8cb9
Binary files /dev/null and b/hadoop-ozone/integration-test/src/test/resources/testSequenceFile differ
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org