You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2021/12/03 22:51:15 UTC

[ozone] branch HDDS-3816-ec updated: HDDS-5955: EC: Track the failed servers to add into the excludeList when invoking allocateBlock (#2849)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 1b9724d  HDDS-5955: EC: Track the failed servers to add into the excludeList when invoking allocateBlock (#2849)
1b9724d is described below

commit 1b9724d60a7fcb485e3b837d028b50536e86c061
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Fri Dec 3 14:50:45 2021 -0800

    HDDS-5955: EC: Track the failed servers to add into the excludeList when invoking allocateBlock (#2849)
    
    Co-authored-by: Uma Maheswara Rao G <um...@cloudera.com>
---
 .../hdds/scm/storage/ECBlockOutputStream.java      | 30 +++++++++
 .../SCMContainerPlacementRackScatter.java          |  4 ++
 .../ozone/client/io/ECBlockOutputStreamEntry.java  | 34 ++++++----
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  | 71 ++++++++++++--------
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  2 +-
 .../ozone/client/rpc/TestECKeyOutputStream.java    | 78 +++++++++++++++++++---
 6 files changed, 169 insertions(+), 50 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 5ee34d3..66e550f 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.storage;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
@@ -41,6 +42,7 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlock
  */
 public class ECBlockOutputStream extends BlockOutputStream{
 
+  private final DatanodeDetails datanodeDetails;
   private CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
       currentChunkRspFuture = null;
 
@@ -64,6 +66,8 @@ public class ECBlockOutputStream extends BlockOutputStream{
   ) throws IOException {
     super(blockID, xceiverClientManager,
         pipeline, bufferPool, config, token);
+    // In EC stream, there will be only one node in pipeline.
+    this.datanodeDetails = pipeline.getClosestNode();
   }
 
   @Override
@@ -144,4 +148,30 @@ public class ECBlockOutputStream extends BlockOutputStream{
       getCurrentPutBlkResponseFuture() {
     return this.putBlkRspFuture;
   }
+
+  /**
+   * Gets the target data node used in the current stream.
+   * @return DatanodeDetails
+   */
+  public DatanodeDetails getDatanodeDetails() {
+    return datanodeDetails;
+  }
+
+  @Override
+  void validateResponse(
+      ContainerProtos.ContainerCommandResponseProto responseProto)
+      throws IOException {
+    try {
+      // if the ioException is already set, it means a prev request has failed
+      // just throw the exception. The current operation will fail with the
+      // original error
+      IOException exception = getIoException();
+      if (exception != null) {
+        return;
+      }
+      ContainerProtocolCalls.validateContainerResponse(responseProto);
+    } catch (IOException sce) {
+      setIoException(sce);
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
index f3b2c0c..54e7238 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackScatter.java
@@ -179,6 +179,10 @@ public final class SCMContainerPlacementRackScatter
       }
 
       for (Node rack : toChooseRacks) {
+        if (rack == null) {
+          // TODO: need to recheck why null coming here.
+          continue;
+        }
         Node node = chooseNode(rack.getNetworkFullPath(), unavailableNodes,
             metadataSizeRequired, dataSizeRequired);
         if (node != null) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 95609b6..0cb7b0f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -64,6 +65,7 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
 
   private ECBlockOutputStream[] blockOutputStreams;
   private int currentStreamIdx = 0;
+  private long successfulBlkGrpAckedLen;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   ECBlockOutputStreamEntry(BlockID blockID, String key,
@@ -201,13 +203,15 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
       return 0;
     }
     updateBlockID(underlyingBlockID());
-    // Returning zero here. Underlying streams in EC entry are
-    // ECBlockOutputStreams, extending from BlockOutputStream, without
-    // overriding getTotalAckDataLength, and default implementation returns
-    // constant zero, so even summarizing the return value of this method
-    // from blockStreams entries would yield to 0. Once this changes, we need
-    // to revisit this, and implement a proper sum of data or all streams.
-    return 0;
+
+    return this.successfulBlkGrpAckedLen;
+  }
+
+  void updateBlockGroupToAckedPosition(long len) {
+    if (isWritingParity()){
+      return;
+    }
+    this.successfulBlkGrpAckedLen += len;
   }
 
   /**
@@ -216,6 +220,7 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
    * In EC entries the parity writes does not count into this, as the written
    * data length represents the attempts of the classes using the entry, and
    * not the attempts of the entry itself.
+   *
    * @return 0 if the stream is not initialized, the amount of data bytes that
    *    were attempted to be written to the entry.
    */
@@ -267,7 +272,11 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
       if (stream == null) {
         continue;
       }
-      stream.executePutBlock(false, true);
+      try {
+        stream.executePutBlock(false, true);
+      } catch (Exception e) {
+        stream.setIoException(e);
+      }
     }
   }
 
@@ -291,8 +300,9 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
    *                   futures if false.
    * @return
    */
-  public boolean checkStreamFailures(boolean forPutBlock) {
+  public List<ECBlockOutputStream> getFailedStreams(boolean forPutBlock) {
     final Iterator<ECBlockOutputStream> iter = blockStreams().iterator();
+    List<ECBlockOutputStream> failedStreams = new ArrayList<>();
     while (iter.hasNext()) {
       final ECBlockOutputStream stream = iter.next();
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
@@ -305,10 +315,10 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
             stream != null ? stream.getCurrentChunkResponseFuture() : null;
       }
       if (isFailed(stream, responseFuture)) {
-        return true;
+        failedStreams.add(stream);
       }
     }
-    return false;
+    return failedStreams;
   }
 
   private boolean isFailed(
@@ -397,13 +407,11 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry{
       return this;
     }
 
-
     public ECBlockOutputStreamEntry.Builder setLength(long len) {
       this.length = len;
       return this;
     }
 
-
     public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) {
       this.bufferPool = pool;
       return this;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 24b79c7..f5480c7 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -57,7 +58,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
   private final int numParityBlks;
   private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
   private final RawErasureEncoder encoder;
-  private final ECReplicationConfig.EcCodec ecCodec;
 
   private enum StripeWriteStatus {
     SUCCESS,
@@ -100,7 +100,6 @@ public class ECKeyOutputStream extends KeyOutputStream {
     this.config = config;
     // For EC, cell/chunk size and buffer size can be same for now.
     ecChunkSize = replicationConfig.getEcChunkSize();
-    this.ecCodec = replicationConfig.getCodec();
     this.config.setStreamBufferMaxSize(ecChunkSize);
     this.config.setStreamBufferFlushSize(ecChunkSize);
     this.config.setStreamBufferSize(ecChunkSize);
@@ -218,7 +217,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
     // Rollback the length/offset updated as part of this failed stripe write.
     offset -= failedStripeDataSize;
     blockOutputStreamEntryPool.getCurrentStreamEntry()
-        .incCurrentPosition(-failedStripeDataSize);
+        .resetToAckedPosition();
 
     // Let's close the current entry.
     blockOutputStreamEntryPool.getCurrentStreamEntry().close();
@@ -256,14 +255,20 @@ public class ECKeyOutputStream extends KeyOutputStream {
       boolean allocateBlockIfFull) throws IOException {
     //check data blocks finished
     //If index is > datanum blks
-    int currentStreamIdx = blockOutputStreamEntryPool.getCurrentStreamEntry()
-        .getCurrentStreamIdx();
+    ECBlockOutputStreamEntry currentStreamEntry =
+        blockOutputStreamEntryPool.getCurrentStreamEntry();
+    int currentStreamIdx = currentStreamEntry.getCurrentStreamIdx();
     if (currentStreamIdx == numDataBlks && lastDataBuffPos == ecChunkSize) {
       //Lets encode and write
       if (handleParityWrites(ecChunkSize,
           allocateBlockIfFull) == StripeWriteStatus.FAILED) {
+        // TODO: This should make sure to retry until it's success. (HDDS-6036)
         handleStripeFailure(ecChunkSize, numDataBlks * ecChunkSize);
       }
+      // At this stage stripe write is successful.
+      currentStreamEntry.updateBlockGroupToAckedPosition(
+          currentStreamEntry.getCurrentPosition());
+
     }
   }
 
@@ -276,13 +281,19 @@ public class ECKeyOutputStream extends KeyOutputStream {
     // TODO: we should alter the put block calls to share CRC to each stream.
     ECBlockOutputStreamEntry streamEntry =
         blockOutputStreamEntryPool.getCurrentStreamEntry();
+    List<ECBlockOutputStream> failedStreams =
+        streamEntry.getFailedStreams(false);
     // Since writes are async, let's check the failures once.
-    if(streamEntry.checkStreamFailures(false)){
+    if (failedStreams.size() > 0) {
+      addToExcludeNodesList(failedStreams);
       return StripeWriteStatus.FAILED;
     }
     streamEntry.executePutBlock();
+
+    failedStreams = streamEntry.getFailedStreams(true);
     // Since putBlock also async, let's check the failures again.
-    if(streamEntry.checkStreamFailures(true)){
+    if (failedStreams.size() > 0) {
+      addToExcludeNodesList(failedStreams);
       return StripeWriteStatus.FAILED;
     }
     ecChunkBufferCache.clear(parityCellSize);
@@ -299,6 +310,13 @@ public class ECKeyOutputStream extends KeyOutputStream {
     return StripeWriteStatus.SUCCESS;
   }
 
+  private void addToExcludeNodesList(List<ECBlockOutputStream> failedStreams) {
+    for (ECBlockOutputStream failedStream : failedStreams) {
+      blockOutputStreamEntryPool.getExcludeList()
+          .addDatanode(failedStream.getDatanodeDetails());
+    }
+  }
+
   void writeParityCells(int parityCellSize) throws IOException {
     final ByteBuffer[] buffers = ecChunkBufferCache.getDataBuffers();
     ecChunkBufferCache.allocateParityBuffers(parityCellSize);
@@ -351,8 +369,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
       try {
         // Since it's a fullcell, let's write all content from buffer.
         writeToOutputStream(current, len, bytesToWrite.array(),
-            bytesToWrite.array().length, 0, current.getWrittenDataLength(),
-            isParity);
+            bytesToWrite.array().length, 0, isParity);
       } catch (Exception e) {
         markStreamAsFailed(e);
       }
@@ -360,29 +377,18 @@ public class ECKeyOutputStream extends KeyOutputStream {
   }
 
   private int writeToOutputStream(BlockOutputStreamEntry current, long len,
-      byte[] b, int writeLen, int off, long currentPos, boolean isParity)
+      byte[] b, int writeLen, int off, boolean isParity)
       throws IOException {
     try {
-      current.write(b, off, writeLen);
       if (!isParity) {
+        // In case if exception while writing, this length will be updated back
+        // as part of handleStripeFailure.
         offset += writeLen;
       }
+      current.write(b, off, writeLen);
     } catch (IOException ioe) {
-      // for the current iteration, totalDataWritten - currentPos gives the
-      // amount of data already written to the buffer
-
-      // In the retryPath, the total data to be written will always be equal
-      // to or less than the max length of the buffer allocated.
-      // The len specified here is the combined sum of the data length of
-      // the buffers
-      Preconditions.checkState(len <= config.getStreamBufferMaxSize());
-      int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
-      writeLen = dataWritten;
-
-      if (!isParity) {
-        offset += writeLen;
-      }
-      LOG.debug("writeLen {}, total len {}", writeLen, len);
+      LOG.debug("Exception:: writeLen: " + writeLen + ", total len:" + len,
+          ioe);
       handleException(current, ioe);
     }
     return writeLen;
@@ -392,6 +398,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
       IOException exception) throws IOException {
     Throwable t = HddsClientUtils.checkForException(exception);
     Preconditions.checkNotNull(t);
+    boolean containerExclusionException = checkIfContainerToExclude(t);
+    if (containerExclusionException) {
+      blockOutputStreamEntryPool.getExcludeList()
+          .addPipeline(streamEntry.getPipeline().getId());
+    }
     // In EC, we will just close the current stream.
     markStreamAsFailed(exception);
   }
@@ -458,8 +469,7 @@ public class ECKeyOutputStream extends KeyOutputStream {
           try {
             byte[] array = bytesToWrite.array();
             writeToOutputStream(current, bytesToWrite.position(), array,
-                bytesToWrite.position(), 0, current.getWrittenDataLength(),
-                false);
+                bytesToWrite.position(), 0, false);
           } catch (Exception e) {
             markStreamAsFailed(e);
           }
@@ -475,6 +485,11 @@ public class ECKeyOutputStream extends KeyOutputStream {
           // TODO: loop this until we succeed?
           handleStripeFailure(parityCellSize, lastStripeSize);
         }
+        blockOutputStreamEntryPool.getCurrentStreamEntry()
+            .updateBlockGroupToAckedPosition(
+                blockOutputStreamEntryPool.getCurrentStreamEntry()
+                    .getCurrentPosition());
+
       }
 
       closeCurrentStreamEntry();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 4e0fbf9..f7b323c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -431,7 +431,7 @@ public class KeyOutputStream extends OutputStream {
 
   // Every container specific exception from datatnode will be seen as
   // StorageContainerException
-  private boolean checkIfContainerToExclude(Throwable t) {
+  boolean checkIfContainerToExclude(Throwable t) {
     return t instanceof StorageContainerException;
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index e8b3d11..3549532 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -88,12 +89,21 @@ public class TestECKeyOutputStream {
     conf.setFromObject(clientConfig);
 
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    // If SCM detects dead node too quickly, then container would be moved to
+    // closed state and all in progress writes will get exception. To avoid
+    // that, we are just keeping higher timeout and none of the tests depending
+    // on deadnode detection timeout currently.
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 300, TimeUnit.SECONDS);
+    conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(
+        "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
+        TimeUnit.SECONDS);
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
 
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
         .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
         .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
         .setStreamBufferMaxSize(maxFlushSize)
@@ -205,12 +215,7 @@ public class TestECKeyOutputStream {
 
   public void testMultipleChunksInSingleWriteOp(int numChunks)
       throws IOException {
-    byte[] inputData = new byte[numChunks * chunkSize];
-    for (int i = 0; i < numChunks; i++) {
-      int start = (i * chunkSize);
-      Arrays.fill(inputData, start, start + chunkSize - 1,
-          String.valueOf(i % 9).getBytes(UTF_8)[0]);
-    }
+    byte[] inputData = getInputBytes(numChunks);
     final OzoneBucket bucket = getOzoneBucket();
     String keyName = "testMultipleChunksInSingleWriteOp" + numChunks;
     try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
@@ -258,4 +263,61 @@ public class TestECKeyOutputStream {
     }
     return builder.toString().getBytes(UTF_8);
   }
+
+  @Test
+  public void testWriteShouldSucceedWhenDNKilled() throws Exception {
+    int numChunks = 3;
+    byte[] inputData = getInputBytes(numChunks);
+    final OzoneBucket bucket = getOzoneBucket();
+    String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
+    try {
+      try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
+          new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+              chunkSize), new HashMap<>())) {
+        out.write(inputData);
+        // Kill a node from first pipeline
+        DatanodeDetails nodeToKill =
+            ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
+                .get(0).getPipeline().getFirstNode();
+        cluster.shutdownHddsDatanode(nodeToKill);
+
+        out.write(inputData);
+        // Check the second blockGroup pipeline to make sure that the failed not
+        // is not selected.
+        Assert.assertFalse(
+            ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
+                .get(1).getPipeline().getNodes().contains(nodeToKill));
+      }
+
+      try (OzoneInputStream is = bucket.readKey(keyName)) {
+        // TODO: this skip can be removed once read handles online recovery.
+        long skip = is.skip(inputData.length);
+        Assert.assertTrue(skip == inputData.length);
+        // All nodes available in second block group. So, lets assert.
+        byte[] fileContent = new byte[inputData.length];
+        Assert.assertEquals(inputData.length, is.read(fileContent));
+        Assert.assertEquals(new String(inputData, UTF_8),
+            new String(fileContent, UTF_8));
+      }
+    } finally {
+      // TODO: optimize to just start the killed DN back.
+      resetCluster();
+    }
+  }
+
+  private void resetCluster() throws Exception {
+    cluster.shutdown();
+    init();
+  }
+
+  private byte[] getInputBytes(int numChunks) {
+    byte[] inputData = new byte[numChunks * chunkSize];
+    for (int i = 0; i < numChunks; i++) {
+      int start = (i * chunkSize);
+      Arrays.fill(inputData, start, start + chunkSize - 1,
+          String.valueOf(i % 9).getBytes(UTF_8)[0]);
+    }
+    return inputData;
+  }
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org