You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2017/12/16 02:10:34 UTC
[07/46] hadoop git commit: HDFS-11653. [READ] ProvidedReplica should
return an InputStream that is bounded by its length
HDFS-11653. [READ] ProvidedReplica should return an InputStream that is bounded by its length
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1108cb76
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1108cb76
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1108cb76
Branch: refs/heads/trunk
Commit: 1108cb76917debf0a8541d5130e015883eb521af
Parents: d65df0f
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Thu May 4 12:43:48 2017 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Dec 15 17:51:37 2017 -0800
----------------------------------------------------------------------
.../hdfs/server/datanode/ProvidedReplica.java | 5 +-
.../datanode/TestProvidedReplicaImpl.java | 163 +++++++++++++++++++
2 files changed, 167 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1108cb76/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
index b021ea2..946ab5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
+
+import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -98,7 +100,8 @@ public abstract class ProvidedReplica extends ReplicaInfo {
if (remoteFS != null) {
FSDataInputStream ins = remoteFS.open(new Path(fileURI));
ins.seek(fileOffset + seekOffset);
- return new FSDataInputStream(ins);
+ return new BoundedInputStream(
+ new FSDataInputStream(ins), getBlockDataLength());
} else {
throw new IOException("Remote filesystem for provided replica " + this +
" does not exist");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1108cb76/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java
new file mode 100644
index 0000000..8258c21
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestProvidedReplicaImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests the implementation of {@link ProvidedReplica}.
+ */
+public class TestProvidedReplicaImpl {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestProvidedReplicaImpl.class);
+ private static final String BASE_DIR =
+ new FileSystemTestHelper().getTestRootDir();
+ private static final String FILE_NAME = "provided-test";
+ //length of the file that is associated with the provided blocks.
+ private static final long FILE_LEN = 128 * 1024 * 10L + 64 * 1024;
+ //length of each provided block.
+ private static final long BLK_LEN = 128 * 1024L;
+
+ private static List<ProvidedReplica> replicas;
+
+ private static void createFileIfNotExists(String baseDir) throws IOException {
+ File newFile = new File(baseDir, FILE_NAME);
+ newFile.getParentFile().mkdirs();
+ if(!newFile.exists()) {
+ newFile.createNewFile();
+ OutputStream writer = new FileOutputStream(newFile.getAbsolutePath());
+ //FILE_LEN is length in bytes.
+ byte[] bytes = new byte[1];
+ bytes[0] = (byte) 0;
+ for(int i=0; i< FILE_LEN; i++) {
+ writer.write(bytes);
+ }
+ writer.flush();
+ writer.close();
+ LOG.info("Created provided file " + newFile +
+ " of length " + newFile.length());
+ }
+ }
+
+ private static void createProvidedReplicas(Configuration conf) {
+ long numReplicas = (long) Math.ceil((double) FILE_LEN/BLK_LEN);
+ File providedFile = new File(BASE_DIR, FILE_NAME);
+ replicas = new ArrayList<ProvidedReplica>();
+
+ LOG.info("Creating " + numReplicas + " provided replicas");
+ for (int i=0; i<numReplicas; i++) {
+ long currentReplicaLength =
+ FILE_LEN >= (i+1)*BLK_LEN ? BLK_LEN : FILE_LEN - i*BLK_LEN;
+ replicas.add(
+ new FinalizedProvidedReplica(i, providedFile.toURI(), i*BLK_LEN,
+ currentReplicaLength, 0, null, conf));
+ }
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ createFileIfNotExists(new File(BASE_DIR).getAbsolutePath());
+ createProvidedReplicas(new Configuration());
+ }
+
+ /**
+ * Checks if {@code ins} matches the provided file from offset
+ * {@code fileOffset} for length {@ dataLength}.
+ * @param file the local file
+ * @param ins input stream to compare against
+ * @param fileOffset offset
+ * @param dataLength length
+ * @throws IOException
+ */
+ private void verifyReplicaContents(File file,
+ InputStream ins, long fileOffset, long dataLength)
+ throws IOException {
+
+ InputStream fileIns = new FileInputStream(file);
+ fileIns.skip(fileOffset);
+
+ try (ReadableByteChannel i =
+ Channels.newChannel(new BoundedInputStream(fileIns, dataLength))) {
+ try (ReadableByteChannel j = Channels.newChannel(ins)) {
+ ByteBuffer ib = ByteBuffer.allocate(4096);
+ ByteBuffer jb = ByteBuffer.allocate(4096);
+ while (true) {
+ int il = i.read(ib);
+ int jl = j.read(jb);
+ if (il < 0 || jl < 0) {
+ assertEquals(il, jl);
+ break;
+ }
+ ib.flip();
+ jb.flip();
+ int cmp = Math.min(ib.remaining(), jb.remaining());
+ for (int k = 0; k < cmp; ++k) {
+ assertEquals(ib.get(), jb.get());
+ }
+ ib.compact();
+ jb.compact();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testProvidedReplicaRead() throws IOException {
+
+ File providedFile = new File(BASE_DIR, FILE_NAME);
+ for(int i=0; i < replicas.size(); i++) {
+ ProvidedReplica replica = replicas.get(i);
+ //block data should exist!
+ assertTrue(replica.blockDataExists());
+ assertEquals(providedFile.toURI(), replica.getBlockURI());
+ verifyReplicaContents(providedFile, replica.getDataInputStream(0),
+ BLK_LEN*i, replica.getBlockDataLength());
+ }
+ LOG.info("All replica contents verified");
+
+ providedFile.delete();
+ //the block data should no longer be found!
+ for(int i=0; i < replicas.size(); i++) {
+ ProvidedReplica replica = replicas.get(i);
+ assertTrue(!replica.blockDataExists());
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org