You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/08/06 15:05:19 UTC

[GitHub] [ozone] sodonnel opened a new pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

sodonnel opened a new pull request #2507:
URL: https://github.com/apache/ozone/pull/2507


   ## What changes were proposed in this pull request?
   
   Implement the happy-path read, which does not support "degraded read" (on the fly EC recovery), to read the data from a single EC block group.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-4942
   
   ## How was this patch tested?
   
   New unit tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r690799229



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream

Review comment:
       Just a note: if we go with ECBlockGroupInputStream, we also should rename ECBlockOutputStream, and keep the naming symmetric. I can live with both approach, if we are consistent with in/output stream names, but would go with the current one if it is on me ;)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r690796524



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamFactory to create
+ * BlockInputSteams in a real cluster.
+ */
+public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+
+  public BlockInputStreamFactoryImpl(XceiverClientFactory xceiverFactory,
+      Function<BlockID, Pipeline> refreshFunction) {
+    this.xceiverClientFactory = xceiverFactory;
+    this.refreshFunction = refreshFunction;
+  }
+
+  @Override
+  public BlockInputStream create(BlockID blockId, long blockLen,
+      Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum) {
+    return new BlockInputStream(blockId, blockLen, pipeline, token,

Review comment:
       I agree that this is more or less a strategy instead of a factory, in the sense of how it is used, on the other hand it is a strategy, given to the ECBlockInputstream, which provides regular BlockInputStream instances. In this sense it is creating objects. I checked into if it can be implemented as a functional interface, and passed in as a lambda, but Stephen is right, that would not make the code any more simpler or readable.
   If we agree (and I am) that factory would be an overload of the factory design concept, with different meaning here, then a good alternative might be BlockInputStreamProvider, or BlockInputStreamCreator as the name, but the code itself is fine. (Though, if we go with provider, I would change the method name to provide instead of create.)
   
   One more thing that I would question here, is the separation of the refreshFunction and the xceiverClientFactory into an internal state inside this type. I am unsure, but most likely ECBlockInputStream will be used from KeyInputStream#addStream(), and in that case, we can pass on both the client factory and the refresh function to the create/provide method there, and nulls in the current tests. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r690778463



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Interface used to create BlockInputStreams.
+ */
+public interface BlockInputStreamFactory {

Review comment:
       Sorry, my bad, I had some assumptions and overlooked a few things here...
   @sodonnel after you asked offline for some extra explanation what I am thinking about here, I went and tried out, and realized that even if it is possible to create a functional interface, and use a lambda, you are right, it does not really express anything better...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691286265



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamFactory to create
+ * BlockInputSteams in a real cluster.
+ */
+public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+
+  public BlockInputStreamFactoryImpl(XceiverClientFactory xceiverFactory,
+      Function<BlockID, Pipeline> refreshFunction) {
+    this.xceiverClientFactory = xceiverFactory;
+    this.refreshFunction = refreshFunction;
+  }
+
+  @Override
+  public BlockInputStream create(BlockID blockId, long blockLen,
+      Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum) {
+    return new BlockInputStream(blockId, blockLen, pipeline, token,

Review comment:
       > One more thing that I would question here, is the separation of the refreshFunction and the xceiverClientFactory into an internal state inside this type. I am unsure, but most likely ECBlockInputStream will be used from KeyInputStream#addStream(), and in that case, we can pass on both the client factory and the refresh function to the create/provide method there, and nulls in the current tests.
   
   I think the reason I put them into the type was to cut down the number of parameters getting passed from ECKeyInputStream (which does not exist yet) to ECBlockInputStream. One instance of the factory / provider can be created in ECKeyInputStream and then passed to the ECBlockInputStream constructor. It does not care about or use the xceiverFactory or refreshFunction except to pass them onwards to the underlying BlockInputStream . Right now, I had that inside the factory / provider, rather than storing them as instance variables in ECBlockInputStream.
   
   To make this change, I would need to add two parameters variables to the ECBlockInputStream constructor and then store them as instance variables within the class, and then pass them to the create() method.
   
   If you feel it makes things better, I am happy to make the change I suggested above.
   
   > then a good alternative might be BlockInputStreamProvider,
   
   OK - I will change the name to provider and the method to "provide" rather than create.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r690464004



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream

Review comment:
       Is it make more sense to name it as BlockGroup InputStream?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);

Review comment:
       Nit: Format missing.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+    ECBlockInputStream ecb = null;
+    List<TestBlockInputStream> streams = null;
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+    // Block less than a single cell
+    try {
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      ecb.read(buf);
+      // We expect only 1 block stream and it should have a length passed of
+      // ONEMB - 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+    } finally {
+      if (ecb != null) {
+        ecb.close();
+      }
+    }
+
+    // Block of two cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(100, streams.get(1).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of three cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(100, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of 3 full stripes and a partial stripe
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Single Full cell
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,

Review comment:
       you may want to put in try stream, so it will be closed automatically

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {

Review comment:
       Should we split them into separate tests?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+    ECBlockInputStream ecb = null;
+    List<TestBlockInputStream> streams = null;
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+    // Block less than a single cell
+    try {
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      ecb.read(buf);
+      // We expect only 1 block stream and it should have a length passed of
+      // ONEMB - 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+    } finally {
+      if (ecb != null) {
+        ecb.close();
+      }
+    }
+
+    // Block of two cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(100, streams.get(1).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of three cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(100, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of 3 full stripes and a partial stripe
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Single Full cell
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      // We expect 3 block streams and it should have a length passed of
+      // ONEMB and 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      ecb.close();
+    } finally {
+      ecb.close();
+    }
+
+    // Several full stripes
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 9 * ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      // We expect 3 block streams and it should have a length passed of
+      // ONEMB and 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(3 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+  }
+
+  @Test
+  public void testSimpleRead() throws IOException {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,

Review comment:
       same a above. Use try for stream

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
+
+  private static final int EOF = -1;
+
+  private final ECReplicationConfig repConfig;
+  private final int ecChunkSize;
+  private final BlockInputStreamFactory streamFactory;
+  private final boolean verifyChecksum;
+  private final OmKeyLocationInfo blockInfo;
+  private final DatanodeDetails[] dataLocations;
+  private final DatanodeDetails[] parityLocations;
+  private final BlockInputStream[] blockStreams;
+  private final int maxLocations;
+
+  private int position = 0;
+  private boolean closed = false;
+
+  public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockInputStreamFactory streamFactory) {
+    this.repConfig = repConfig;
+    this.ecChunkSize = ecChunkSize;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.streamFactory = streamFactory;
+    this.maxLocations = repConfig.getData() + repConfig.getParity();
+    this.dataLocations = new DatanodeDetails[repConfig.getData()];
+    this.parityLocations = new DatanodeDetails[repConfig.getParity()];
+    this.blockStreams = new BlockInputStream[repConfig.getData()];
+
+    setBlockLocations(this.blockInfo.getPipeline());
+  }
+
+  public synchronized boolean hasSufficientLocations() {
+    // Until we implement "on the fly" recovery, all data location must be
+    // present and we have enough locations if that is the case.
+    //
+    // The number of locations needed is a function of the EC Chunk size. If the
+    // block length is <= the chunk size, we should only have location 1. If it
+    // is greater than the chunk size but less than chunk_size * 2, then we must
+    // have two locations. If it is greater than chunk_size * data_num, then we
+    // must have all data_num locations.
+    int expectedDataBlocks =
+        (int)Math.min(
+            Math.ceil((double)blockInfo.getLength() / ecChunkSize),
+            repConfig.getData());
+    for (int i=0; i<expectedDataBlocks; i++) {
+      if (dataLocations[i] == null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Using the current position, returns the index of the blockStream we should
+   * be reading from. This is the index in the internal array holding the
+   * stream reference. The block group index will be one greater than this.
+   * @return
+   */
+  private int currentStreamIndex() {
+    return ((position / ecChunkSize) % repConfig.getData());
+  }
+
+  /**
+   * Uses the current position and ecChunkSize to determine which of the
+   * internal block streams the next read should come from. Also opens the
+   * stream if it has not been opened already.
+   * @return BlockInput stream to read from.
+   */
+  private BlockInputStream getOrOpenStream() {
+    int ind = currentStreamIndex();
+    BlockInputStream stream = blockStreams[ind];
+    if (stream == null) {
+      // To read an EC block, we create a STANDALONE pipeline that contains the
+      // single location for the block index we want to read. The EC blocks are
+      // indexed from 1 to N, however the data locations are stored in the
+      // dataLocations array indexed from zero.
+      Pipeline pipeline = Pipeline.newBuilder()
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              HddsProtos.ReplicationFactor.ONE))
+          .setNodes(Arrays.asList(dataLocations[ind]))
+          .setId(PipelineID.randomId())

Review comment:
       So there is no pipeline Id validation at DN? example if the writer written the block with pipeline id 10, then reader can use any id?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamFactory to create
+ * BlockInputSteams in a real cluster.
+ */
+public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+
+  public BlockInputStreamFactoryImpl(XceiverClientFactory xceiverFactory,
+      Function<BlockID, Pipeline> refreshFunction) {
+    this.xceiverClientFactory = xceiverFactory;
+    this.refreshFunction = refreshFunction;
+  }
+
+  @Override
+  public BlockInputStream create(BlockID blockId, long blockLen,
+      Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum) {
+    return new BlockInputStream(blockId, blockLen, pipeline, token,

Review comment:
       We can use factory when we need to create different category of objects of same type. Do you have plan to create different type of blockInputStream here? If yes, then what parameter decides the type?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691137207



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();

Review comment:
       You are correct. It was just a copy and paste of the same code and I missed to remove the first one. I have fixed this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r690464004



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream

Review comment:
       Is it make more sense to name it as BlockGroup InputStream?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);

Review comment:
       Nit: Format missing.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+    ECBlockInputStream ecb = null;
+    List<TestBlockInputStream> streams = null;
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+    // Block less than a single cell
+    try {
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      ecb.read(buf);
+      // We expect only 1 block stream and it should have a length passed of
+      // ONEMB - 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+    } finally {
+      if (ecb != null) {
+        ecb.close();
+      }
+    }
+
+    // Block of two cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(100, streams.get(1).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of three cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(100, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of 3 full stripes and a partial stripe
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Single Full cell
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,

Review comment:
       you may want to put in try stream, so it will be closed automatically

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {

Review comment:
       Should we split them into separate tests?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+    ECBlockInputStream ecb = null;
+    List<TestBlockInputStream> streams = null;
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+    // Block less than a single cell
+    try {
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      ecb.read(buf);
+      // We expect only 1 block stream and it should have a length passed of
+      // ONEMB - 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+    } finally {
+      if (ecb != null) {
+        ecb.close();
+      }
+    }
+
+    // Block of two cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(100, streams.get(1).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of three cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(100, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of 3 full stripes and a partial stripe
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Single Full cell
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      // We expect 3 block streams and it should have a length passed of
+      // ONEMB and 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      ecb.close();
+    } finally {
+      ecb.close();
+    }
+
+    // Several full stripes
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 9 * ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      // We expect 3 block streams and it should have a length passed of
+      // ONEMB and 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(3 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+  }
+
+  @Test
+  public void testSimpleRead() throws IOException {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,

Review comment:
       same a above. Use try for stream

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
+
+  private static final int EOF = -1;
+
+  private final ECReplicationConfig repConfig;
+  private final int ecChunkSize;
+  private final BlockInputStreamFactory streamFactory;
+  private final boolean verifyChecksum;
+  private final OmKeyLocationInfo blockInfo;
+  private final DatanodeDetails[] dataLocations;
+  private final DatanodeDetails[] parityLocations;
+  private final BlockInputStream[] blockStreams;
+  private final int maxLocations;
+
+  private int position = 0;
+  private boolean closed = false;
+
+  public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockInputStreamFactory streamFactory) {
+    this.repConfig = repConfig;
+    this.ecChunkSize = ecChunkSize;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.streamFactory = streamFactory;
+    this.maxLocations = repConfig.getData() + repConfig.getParity();
+    this.dataLocations = new DatanodeDetails[repConfig.getData()];
+    this.parityLocations = new DatanodeDetails[repConfig.getParity()];
+    this.blockStreams = new BlockInputStream[repConfig.getData()];
+
+    setBlockLocations(this.blockInfo.getPipeline());
+  }
+
+  public synchronized boolean hasSufficientLocations() {
+    // Until we implement "on the fly" recovery, all data location must be
+    // present and we have enough locations if that is the case.
+    //
+    // The number of locations needed is a function of the EC Chunk size. If the
+    // block length is <= the chunk size, we should only have location 1. If it
+    // is greater than the chunk size but less than chunk_size * 2, then we must
+    // have two locations. If it is greater than chunk_size * data_num, then we
+    // must have all data_num locations.
+    int expectedDataBlocks =
+        (int)Math.min(
+            Math.ceil((double)blockInfo.getLength() / ecChunkSize),
+            repConfig.getData());
+    for (int i=0; i<expectedDataBlocks; i++) {
+      if (dataLocations[i] == null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Using the current position, returns the index of the blockStream we should
+   * be reading from. This is the index in the internal array holding the
+   * stream reference. The block group index will be one greater than this.
+   * @return
+   */
+  private int currentStreamIndex() {
+    return ((position / ecChunkSize) % repConfig.getData());
+  }
+
+  /**
+   * Uses the current position and ecChunkSize to determine which of the
+   * internal block streams the next read should come from. Also opens the
+   * stream if it has not been opened already.
+   * @return BlockInput stream to read from.
+   */
+  private BlockInputStream getOrOpenStream() {
+    int ind = currentStreamIndex();
+    BlockInputStream stream = blockStreams[ind];
+    if (stream == null) {
+      // To read an EC block, we create a STANDALONE pipeline that contains the
+      // single location for the block index we want to read. The EC blocks are
+      // indexed from 1 to N, however the data locations are stored in the
+      // dataLocations array indexed from zero.
+      Pipeline pipeline = Pipeline.newBuilder()
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              HddsProtos.ReplicationFactor.ONE))
+          .setNodes(Arrays.asList(dataLocations[ind]))
+          .setId(PipelineID.randomId())

Review comment:
       So there is no pipeline Id validation at DN? example if the writer written the block with pipeline id 10, then reader can use any id?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamFactory to create
+ * BlockInputSteams in a real cluster.
+ */
+public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+
+  public BlockInputStreamFactoryImpl(XceiverClientFactory xceiverFactory,
+      Function<BlockID, Pipeline> refreshFunction) {
+    this.xceiverClientFactory = xceiverFactory;
+    this.refreshFunction = refreshFunction;
+  }
+
+  @Override
+  public BlockInputStream create(BlockID blockId, long blockLen,
+      Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum) {
+    return new BlockInputStream(blockId, blockLen, pipeline, token,

Review comment:
       We can use factory when we need to create different category of objects of same type. Do you have plan to create different type of blockInputStream here? If yes, then what parameter decides the type?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691135987



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
+
+  private static final int EOF = -1;
+
+  private final ECReplicationConfig repConfig;
+  private final int ecChunkSize;
+  private final BlockInputStreamFactory streamFactory;
+  private final boolean verifyChecksum;
+  private final OmKeyLocationInfo blockInfo;
+  private final DatanodeDetails[] dataLocations;
+  private final DatanodeDetails[] parityLocations;
+  private final BlockInputStream[] blockStreams;
+  private final int maxLocations;
+
+  private int position = 0;
+  private boolean closed = false;
+
+  public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockInputStreamFactory streamFactory) {
+    this.repConfig = repConfig;
+    this.ecChunkSize = ecChunkSize;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.streamFactory = streamFactory;
+    this.maxLocations = repConfig.getData() + repConfig.getParity();
+    this.dataLocations = new DatanodeDetails[repConfig.getData()];
+    this.parityLocations = new DatanodeDetails[repConfig.getParity()];
+    this.blockStreams = new BlockInputStream[repConfig.getData()];
+
+    setBlockLocations(this.blockInfo.getPipeline());
+  }
+
+  public synchronized boolean hasSufficientLocations() {
+    // Until we implement "on the fly" recovery, all data location must be
+    // present and we have enough locations if that is the case.
+    //
+    // The number of locations needed is a function of the EC Chunk size. If the
+    // block length is <= the chunk size, we should only have location 1. If it
+    // is greater than the chunk size but less than chunk_size * 2, then we must
+    // have two locations. If it is greater than chunk_size * data_num, then we
+    // must have all data_num locations.
+    int expectedDataBlocks =
+        (int)Math.min(
+            Math.ceil((double)blockInfo.getLength() / ecChunkSize),
+            repConfig.getData());
+    for (int i=0; i<expectedDataBlocks; i++) {
+      if (dataLocations[i] == null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Using the current position, returns the index of the blockStream we should
+   * be reading from. This is the index in the internal array holding the
+   * stream reference. The block group index will be one greater than this.
+   * @return
+   */
+  private int currentStreamIndex() {
+    return ((position / ecChunkSize) % repConfig.getData());
+  }
+
+  /**
+   * Uses the current position and ecChunkSize to determine which of the
+   * internal block streams the next read should come from. Also opens the
+   * stream if it has not been opened already.
+   * @return BlockInput stream to read from.
+   */
+  private BlockInputStream getOrOpenStream() {
+    int ind = currentStreamIndex();
+    BlockInputStream stream = blockStreams[ind];
+    if (stream == null) {
+      // To read an EC block, we create a STANDALONE pipeline that contains the
+      // single location for the block index we want to read. The EC blocks are
+      // indexed from 1 to N, however the data locations are stored in the
+      // dataLocations array indexed from zero.
+      Pipeline pipeline = Pipeline.newBuilder()
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              HddsProtos.ReplicationFactor.ONE))
+          .setNodes(Arrays.asList(dataLocations[ind]))
+          .setId(PipelineID.randomId())

Review comment:
       Yes I believe so. After the containers are closed, SCM creates a "read pipeline" to let you read the data. This is created on the fly for each location request, and is really just a list of locations. The pipeline object is reused for this. This means that a random ID is created for these read pipelines. The DN does not seem to care about the pipeline ID you used, but the ID needs to be there to build the pipeline object or it complains.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r692124818



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+    ECBlockInputStream ecb = null;
+    List<TestBlockInputStream> streams = null;
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+    // Block less than a single cell
+    try {
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      ecb.read(buf);
+      // We expect only 1 block stream and it should have a length passed of
+      // ONEMB - 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+    } finally {
+      if (ecb != null) {
+        ecb.close();
+      }
+    }
+
+    // Block of two cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(100, streams.get(1).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of three cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(100, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of 3 full stripes and a partial stripe
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Single Full cell
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,

Review comment:
       Ah ok - I have fixed this though the file (there were a few occurrences of it).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r690615049



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();

Review comment:
       Nit: I don't think we need to clear the map we just created, or am I missing something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691702974



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);

Review comment:
       I mean in above line should have space b/w operator "3 * ONEMB"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691127849



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream

Review comment:
       I think either name is good, so rather than change things, lets stick with the current ECBlockInputStream, privuded @umamaheswararao is happy?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);

Review comment:
       I'm not sure what you are referring to here. Which format is missing?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream
+    implements Seekable, CanUnbuffer, ByteBufferReadable {
+
+  private static final int EOF = -1;
+
+  private final ECReplicationConfig repConfig;
+  private final int ecChunkSize;
+  private final BlockInputStreamFactory streamFactory;
+  private final boolean verifyChecksum;
+  private final OmKeyLocationInfo blockInfo;
+  private final DatanodeDetails[] dataLocations;
+  private final DatanodeDetails[] parityLocations;
+  private final BlockInputStream[] blockStreams;
+  private final int maxLocations;
+
+  private int position = 0;
+  private boolean closed = false;
+
+  public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize,
+      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+      BlockInputStreamFactory streamFactory) {
+    this.repConfig = repConfig;
+    this.ecChunkSize = ecChunkSize;
+    this.verifyChecksum = verifyChecksum;
+    this.blockInfo = blockInfo;
+    this.streamFactory = streamFactory;
+    this.maxLocations = repConfig.getData() + repConfig.getParity();
+    this.dataLocations = new DatanodeDetails[repConfig.getData()];
+    this.parityLocations = new DatanodeDetails[repConfig.getParity()];
+    this.blockStreams = new BlockInputStream[repConfig.getData()];
+
+    setBlockLocations(this.blockInfo.getPipeline());
+  }
+
+  public synchronized boolean hasSufficientLocations() {
+    // Until we implement "on the fly" recovery, all data location must be
+    // present and we have enough locations if that is the case.
+    //
+    // The number of locations needed is a function of the EC Chunk size. If the
+    // block length is <= the chunk size, we should only have location 1. If it
+    // is greater than the chunk size but less than chunk_size * 2, then we must
+    // have two locations. If it is greater than chunk_size * data_num, then we
+    // must have all data_num locations.
+    int expectedDataBlocks =
+        (int)Math.min(
+            Math.ceil((double)blockInfo.getLength() / ecChunkSize),
+            repConfig.getData());
+    for (int i=0; i<expectedDataBlocks; i++) {
+      if (dataLocations[i] == null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Using the current position, returns the index of the blockStream we should
+   * be reading from. This is the index in the internal array holding the
+   * stream reference. The block group index will be one greater than this.
+   * @return
+   */
+  private int currentStreamIndex() {
+    return ((position / ecChunkSize) % repConfig.getData());
+  }
+
+  /**
+   * Uses the current position and ecChunkSize to determine which of the
+   * internal block streams the next read should come from. Also opens the
+   * stream if it has not been opened already.
+   * @return BlockInput stream to read from.
+   */
+  private BlockInputStream getOrOpenStream() {
+    int ind = currentStreamIndex();
+    BlockInputStream stream = blockStreams[ind];
+    if (stream == null) {
+      // To read an EC block, we create a STANDALONE pipeline that contains the
+      // single location for the block index we want to read. The EC blocks are
+      // indexed from 1 to N, however the data locations are stored in the
+      // dataLocations array indexed from zero.
+      Pipeline pipeline = Pipeline.newBuilder()
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              HddsProtos.ReplicationFactor.ONE))
+          .setNodes(Arrays.asList(dataLocations[ind]))
+          .setId(PipelineID.randomId())

Review comment:
       Yes I believe so. After the containers are closed, SCM creates a "read pipeline" to let you read the data. This is created on the fly for each location request, and is really just a list of locations. The pipeline object is reused for this. This means that a random ID is created for these read pipelines. The DN does not seem to care about the pipeline ID you used, but the ID needs to be there to build the pipeline object or it complains.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();

Review comment:
       You are correct. It was just a copy and paste of the same code and I missed to remove the first one. I have fixed this.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {

Review comment:
       I was trying to save some boiler plate code, but I think they are better in separate tests so I have changed them as suggested.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+    ECBlockInputStream ecb = null;
+    List<TestBlockInputStream> streams = null;
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+    // Block less than a single cell
+    try {
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      ecb.read(buf);
+      // We expect only 1 block stream and it should have a length passed of
+      // ONEMB - 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+    } finally {
+      if (ecb != null) {
+        ecb.close();
+      }
+    }
+
+    // Block of two cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(100, streams.get(1).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of three cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(100, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of 3 full stripes and a partial stripe
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Single Full cell
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,

Review comment:
       OK - I have changed all the try to try with resources blocks and it makes things a bit cleaner.

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamFactory to create
+ * BlockInputSteams in a real cluster.
+ */
+public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+
+  public BlockInputStreamFactoryImpl(XceiverClientFactory xceiverFactory,
+      Function<BlockID, Pipeline> refreshFunction) {
+    this.xceiverClientFactory = xceiverFactory;
+    this.refreshFunction = refreshFunction;
+  }
+
+  @Override
+  public BlockInputStream create(BlockID blockId, long blockLen,
+      Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum) {
+    return new BlockInputStream(blockId, blockLen, pipeline, token,

Review comment:
       > One more thing that I would question here, is the separation of the refreshFunction and the xceiverClientFactory into an internal state inside this type. I am unsure, but most likely ECBlockInputStream will be used from KeyInputStream#addStream(), and in that case, we can pass on both the client factory and the refresh function to the create/provide method there, and nulls in the current tests.
   
   I think the reason I put them into the type was to cut down the number of parameters getting passed from ECKeyInputStream (which does not exist yet) to ECBlockInputStream. One instance of the factory / provider can be created in ECKeyInputStream and then passed to the ECBlockInputStream constructor. It does not care about or use the xceiverFactory or refreshFunction except to pass them onwards to the underlying BlockInputStream . Right now, I had that inside the factory / provider, rather than storing them as instance variables in ECBlockInputStream.
   
   To make this change, I would need to add two parameters variables to the ECBlockInputStream constructor and then store them as instance variables within the class, and then pass them to the create() method.
   
   If you feel it makes things better, I am happy to make the change I suggested above.
   
   > then a good alternative might be BlockInputStreamProvider,
   
   OK - I will change the name to provider and the method to "provide" rather than create.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r690601965



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Interface used to create BlockInputStreams.
+ */
+public interface BlockInputStreamFactory {

Review comment:
       As we discussed on other channels, we may want to avoid confusion coming from the name "factory" as this is pretty much just an interface to define a method that creates (supplies) the stream, this way providing the possibility to inject mock instances from tests.
   
   We might want to use a Supplier<BlockInputStream> in the constructor where currently the factory is provided, and then from tests we can provide a TestBlockInputStream, or a mocked InputStream, while in the production code we can just provide a new BlockInputstream from a lambda expression.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel merged pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel merged pull request #2507:
URL: https://github.com/apache/ozone/pull/2507


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691158452



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {

Review comment:
       I was trying to save some boiler plate code, but I think they are better in separate tests so I have changed them as suggested.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691131751



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);

Review comment:
       I'm not sure what you are referring to here. Which format is missing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] umamaheswararao commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
umamaheswararao commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r694391801



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream

Review comment:
       I am ok.

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamProviderImpl.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamFactory to create

Review comment:
       Nit: Please change the comment with the right class name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] fapifta commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
fapifta commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r690601965



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Interface used to create BlockInputStreams.
+ */
+public interface BlockInputStreamFactory {

Review comment:
       As we discussed on other channels, we may want to avoid confusion coming from the name "factory" as this is pretty much just an interface to define a method that creates (supplies) the stream, this way providing the possibility to inject mock instances from tests.
   
   We might want to use a Supplier<BlockInputStream> in the constructor where currently the factory is provided, and then from tests we can provide a TestBlockInputStream, or a mocked InputStream, while in the production code we can just provide a new BlockInputstream from a lambda expression.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();

Review comment:
       Nit: I don't think we need to clear the map we just created, or am I missing something?

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Interface used to create BlockInputStreams.
+ */
+public interface BlockInputStreamFactory {

Review comment:
       Sorry, my bad, I had some assumptions and overlooked a few things here...
   @sodonnel after you asked offline for some extra explanation what I am thinking about here, I went and tried out, and realized that even if it is possible to create a functional interface, and use a lambda, you are right, it does not really express anything better...

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import java.util.function.Function;
+
+/**
+ * Concrete implementation of a BlockInputStreamFactory to create
+ * BlockInputSteams in a real cluster.
+ */
+public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+
+  private final XceiverClientFactory xceiverClientFactory;
+  private final Function<BlockID, Pipeline> refreshFunction;
+
+  public BlockInputStreamFactoryImpl(XceiverClientFactory xceiverFactory,
+      Function<BlockID, Pipeline> refreshFunction) {
+    this.xceiverClientFactory = xceiverFactory;
+    this.refreshFunction = refreshFunction;
+  }
+
+  @Override
+  public BlockInputStream create(BlockID blockId, long blockLen,
+      Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
+      boolean verifyChecksum) {
+    return new BlockInputStream(blockId, blockLen, pipeline, token,

Review comment:
       I agree that this is more or less a strategy instead of a factory, in the sense of how it is used, on the other hand it is a strategy, given to the ECBlockInputstream, which provides regular BlockInputStream instances. In this sense it is creating objects. I checked into if it can be implemented as a functional interface, and passed in as a lambda, but Stephen is right, that would not make the code any more simpler or readable.
   If we agree (and I am) that factory would be an overload of the factory design concept, with different meaning here, then a good alternative might be BlockInputStreamProvider, or BlockInputStreamCreator as the name, but the code itself is fine. (Though, if we go with provider, I would change the method name to provide instead of create.)
   
   One more thing that I would question here, is the separation of the refreshFunction and the xceiverClientFactory into an internal state inside this type. I am unsure, but most likely ECBlockInputStream will be used from KeyInputStream#addStream(), and in that case, we can pass on both the client factory and the refresh function to the create/provide method there, and nulls in the current tests. 

##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream

Review comment:
       Just a note: if we go with ECBlockGroupInputStream, we also should rename ECBlockOutputStream, and keep the naming symmetric. I can live with both approach, if we are consistent with in/output stream names, but would go with the current one if it is on me ;)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691158647



##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
##########
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Tests for ECBlockInputStream.
+ */
+public class TestECBlockInputStream {
+
+  private static final int ONEMB = 1024 * 1024;
+
+  @Test
+  // TODO - this test will need changed when we can do recovery reads.
+  public void testSufficientLocations() {
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    // EC-3-2, 5MB block, so all 3 data locations are needed
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB);
+    ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, very large block, so all 3 data locations are needed
+    keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+
+    // EC-3-2, 1 byte short of 1MB with 1 location
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, ONEMB - 1, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertTrue(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 1MB block but only location is in slot 2 (should never happen)
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 2);
+    keyInfo = createKeyInfo(repConfig, ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 2 locations passed so we do not have sufficient
+    // locations.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+
+    // EC-3-2, 5MB blocks, only 1 data and 2 parity locations present. For now
+    // this will fail as we don't support reconstruction reads yet.
+    dnMap.clear();
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 4);
+    dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
+    keyInfo = createKeyInfo(repConfig, 5 * ONEMB, dnMap);
+    ecb = new ECBlockInputStream(repConfig, ONEMB,
+        keyInfo, true, new TestBlockInputStreamFactory());
+    Assert.assertFalse(ecb.hasSufficientLocations());
+    ecb.close();
+  }
+
+  @Test
+  public void testCorrectBlockSizePassedToBlockStream() throws IOException {
+    ByteBuffer buf = ByteBuffer.allocate(3*ONEMB);
+    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2);
+
+    TestBlockInputStreamFactory streamFactory =
+        new TestBlockInputStreamFactory();
+    ECBlockInputStream ecb = null;
+    List<TestBlockInputStream> streams = null;
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, ONEMB - 100);
+
+    // Block less than a single cell
+    try {
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      ecb.read(buf);
+      // We expect only 1 block stream and it should have a length passed of
+      // ONEMB - 100.
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB - 100, streams.get(0).getLength());
+    } finally {
+      if (ecb != null) {
+        ecb.close();
+      }
+    }
+
+    // Block of two cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(100, streams.get(1).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of three cells
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(ONEMB, streams.get(1).getLength());
+      Assert.assertEquals(100, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Block of 3 full stripes and a partial stripe
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,
+          keyInfo, true, streamFactory);
+      buf.clear();
+      ecb.read(buf);
+      streams = streamFactory.getBlockStreams();
+      Assert.assertEquals(4 * ONEMB, streams.get(0).getLength());
+      Assert.assertEquals(3 * ONEMB + 100, streams.get(1).getLength());
+      Assert.assertEquals(3 * ONEMB, streams.get(2).getLength());
+    } finally {
+      ecb.close();
+    }
+
+    // Single Full cell
+    try {
+      streamFactory = new TestBlockInputStreamFactory();
+      keyInfo = createKeyInfo(repConfig, 5, ONEMB);
+      ecb = new ECBlockInputStream(repConfig, ONEMB,

Review comment:
       OK - I have changed all the try to try with resources blocks and it makes things a bit cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on a change in pull request #2507: HDDS-4942. EC: Implement ECBlockInputStream to read a single EC BlockGroup

Posted by GitBox <gi...@apache.org>.
sodonnel commented on a change in pull request #2507:
URL: https://github.com/apache/ozone/pull/2507#discussion_r691127849



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
##########
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.CanUnbuffer;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteArrayReader;
+import org.apache.hadoop.hdds.scm.storage.ByteBufferReader;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends InputStream

Review comment:
       I think either name is good, so rather than change things, lets stick with the current ECBlockInputStream, privuded @umamaheswararao is happy?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org