You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "Yi Liu (JIRA)" <ji...@apache.org> on 2015/05/19 05:18:02 UTC

[jira] [Comment Edited] (HADOOP-11847) Enhance raw coder allowing to read least required inputs in decoding

    [ https://issues.apache.org/jira/browse/HADOOP-11847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14549697#comment-14549697 ] 

Yi Liu edited comment on HADOOP-11847 at 5/19/15 3:17 AM:
----------------------------------------------------------

Thanks Kai for the patch.

*In AbstractRawErasureCoder.java*
{code}
+      if (buffers[i] == null) {
+        if (allowNull) {
+          continue;
+        }
+        throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing null");
+      }
{code}
Using following code may be more simpler
{code}
if (buffers[i] == null && !allowNull) {
  throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing null");
}
{code}


*In AbstractRawErasureDecoder.java*
Rename {{findGoodInput}} to {{getFirstNotNullInput}}, and we can use generic type of Java, also the implementation can be simplified: 
{code}
  /**
   * Find the first not null input.
   * @param inputs
   * @return the first not null input
   */
  protected <T> T getFirstNotNullInput(T[] inputs) {
    for (T input : inputs) {
      if (input != null) {
        return input;
      }
    }

    throw new HadoopIllegalArgumentException(
        "Invalid inputs are found, all being null");
  }
{code}
Look at the above, is it more cool? 
Then you can change
{code}
ByteBuffer goodInput = (ByteBuffer) findGoodInput(inputs);
{code} to
{code}
ByteBuffer firstNotNullInput = getFirstNotNullInput(inputs);
{code}

{code}
protected int[] getErasedOrNotToReadIndexes(Object[] inputs) {
    int[] invalidIndexes = new int[inputs.length];
{code}
We can accept the {{int erasedNum}} parameter, then we can allocate the exact array size and no need array copy.


*In RSRawDecoder.java*
{code}
/**
   * We need a set of reusable buffers either for the bytes array
   * decoding version or direct buffer decoding version. Normally not both.
   *
   * For both input and output, in addition to the valid buffers from the caller
   * passed from above, we need to provide extra buffers for the internal
   * decoding implementation. For input, the caller should provide at least
   * numDataUnits valid buffers (non-NULL); for output, the caller should 
   * provide no more than numParityUnits but at least one buffers. And the left
   * buffers will be borrowed from either bytesArrayBuffersForInput or 
   * bytesArrayBuffersForOutput, for the bytes array version.
   *
   */
  // Reused buffers for decoding with bytes arrays
  private byte[][] bytesArrayBuffers;
  private byte[][] adjustedByteArrayInputsParameter;
  private byte[][] adjustedByteArrayOutputsParameter;
  private int[] adjustedInputOffsets;
  private int[] adjustedOutputOffsets;

  // Reused buffers for decoding with direct ByteBuffers
  private ByteBuffer[] directBuffers;
  private ByteBuffer[] adjustedDirectBufferInputsParameter;
  private ByteBuffer[] adjustedDirectBufferOutputsParameter;
{code}
I don't think we need these.

{code}
@Override
  protected void doDecode(byte[][] inputs, int[] inputOffsets,
                          int dataLen, int[] erasedIndexes,
                          byte[][] outputs, int[] outputOffsets) {
    ensureBytesArrayBuffers(dataLen);

    /**
     * As passed parameters are friendly to callers but not to the underlying
     * implementations, so we have to adjust them before calling doDecoder.
     */

    int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
    int bufferIdx = 0, erasedIdx;

    // Prepare for adjustedInputsParameter and adjustedInputOffsets
    System.arraycopy(inputs, 0, adjustedByteArrayInputsParameter,
        0, inputs.length);
    System.arraycopy(inputOffsets, 0, adjustedInputOffsets,
        0, inputOffsets.length);
    for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
      // Borrow it from bytesArrayBuffersForInput for the temp usage.
      erasedIdx = erasedOrNotToReadIndexes[i];
      adjustedByteArrayInputsParameter[erasedIdx] =
          resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen);
      adjustedInputOffsets[erasedIdx] = 0; // Always 0 for such temp input
    }

    // Prepare for adjustedOutputsParameter
    for (int i = 0; i < adjustedByteArrayOutputsParameter.length; i++) {
      adjustedByteArrayOutputsParameter[i] =
          resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen);
      adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
    }
    for (int outputIdx = 0, i = 0;
         i < erasedIndexes.length; i++, outputIdx++) {
      for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
        // If this index is one requested by the caller via erasedIndexes, then
        // we use the passed output buffer to avoid copying data thereafter.
        if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
          adjustedByteArrayOutputsParameter[j] =
              resetBuffer(outputs[outputIdx], 0, dataLen);
          adjustedOutputOffsets[j] = outputOffsets[outputIdx];
        }
      }
    }

    doDecodeImpl(adjustedByteArrayInputsParameter, adjustedInputOffsets,
        dataLen, erasedOrNotToReadIndexes,
        adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
  }
{code}
I think we can use the inputs/outputs directly, no need for copy, we just need to do some modification for {{RSUtil.GF}}. Maybe we can discuss offline.

{code}
TODO: HADOOP-11871
 * currently this implementation will compute and decode not to read
 * units unnecessarily due to the underlying implementation limit in GF.
{code}
It's easy to implement it in this JIRA

*In test*
I'd like to see we have following test (here, I only use 6+3 for example, in real tests, you can cover different schema.):
1. all missed chunks are data chunks  (we may have at most 3 chunks missed, here we need to test 1 chunk missed, 2 chunks missed, 3 chunks missed)
2. all missed chunks are parity chunks. (same as above)
3. both data and parity chunks are missed. (same as above)



was (Author: hitliuyi):
Thanks Kai for the patch.

*In AbstractRawErasureCoder.java*
{code}
+      if (buffers[i] == null) {
+        if (allowNull) {
+          continue;
+        }
+        throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing null");
+      }
{code}
Using following code may be more simpler
{code}
if (buffers[i] == null && !allowNull) {
  throw new HadoopIllegalArgumentException("Invalid buffer found, not allowing null");
}
{code}


*In AbstractRawErasureDecoder.java*
Rename {{findGoodInput}} to {{getFirstNullInput}}, and we can use generic type of Java, also the implementation can be simplified: 
{code}
  /**
   * Find the first null input.
   * @param inputs
   * @return the first null input
   */
  protected <T> T getFirstNullInput(T[] inputs) {
    for (T input : inputs) {
      if (input != null) {
        return input;
      }
    }

    throw new HadoopIllegalArgumentException(
        "Invalid inputs are found, all being null");
  }
{code}
Look at the above, is it more cool? 
Then you can change
{code}
ByteBuffer goodInput = (ByteBuffer) findGoodInput(inputs);
{code} to
{code}
ByteBuffer firstNullInput = getFirstNullInput(inputs);
{code}

{code}
protected int[] getErasedOrNotToReadIndexes(Object[] inputs) {
    int[] invalidIndexes = new int[inputs.length];
{code}
We can accept the {{int erasedNum}} parameter, then we can allocate the exact array size and no need array copy.


*In RSRawDecoder.java*
{code}
/**
   * We need a set of reusable buffers either for the bytes array
   * decoding version or direct buffer decoding version. Normally not both.
   *
   * For both input and output, in addition to the valid buffers from the caller
   * passed from above, we need to provide extra buffers for the internal
   * decoding implementation. For input, the caller should provide at least
   * numDataUnits valid buffers (non-NULL); for output, the caller should 
   * provide no more than numParityUnits but at least one buffers. And the left
   * buffers will be borrowed from either bytesArrayBuffersForInput or 
   * bytesArrayBuffersForOutput, for the bytes array version.
   *
   */
  // Reused buffers for decoding with bytes arrays
  private byte[][] bytesArrayBuffers;
  private byte[][] adjustedByteArrayInputsParameter;
  private byte[][] adjustedByteArrayOutputsParameter;
  private int[] adjustedInputOffsets;
  private int[] adjustedOutputOffsets;

  // Reused buffers for decoding with direct ByteBuffers
  private ByteBuffer[] directBuffers;
  private ByteBuffer[] adjustedDirectBufferInputsParameter;
  private ByteBuffer[] adjustedDirectBufferOutputsParameter;
{code}
I don't think we need these.

{code}
@Override
  protected void doDecode(byte[][] inputs, int[] inputOffsets,
                          int dataLen, int[] erasedIndexes,
                          byte[][] outputs, int[] outputOffsets) {
    ensureBytesArrayBuffers(dataLen);

    /**
     * As passed parameters are friendly to callers but not to the underlying
     * implementations, so we have to adjust them before calling doDecoder.
     */

    int[] erasedOrNotToReadIndexes = getErasedOrNotToReadIndexes(inputs);
    int bufferIdx = 0, erasedIdx;

    // Prepare for adjustedInputsParameter and adjustedInputOffsets
    System.arraycopy(inputs, 0, adjustedByteArrayInputsParameter,
        0, inputs.length);
    System.arraycopy(inputOffsets, 0, adjustedInputOffsets,
        0, inputOffsets.length);
    for (int i = 0; i < erasedOrNotToReadIndexes.length; i++) {
      // Borrow it from bytesArrayBuffersForInput for the temp usage.
      erasedIdx = erasedOrNotToReadIndexes[i];
      adjustedByteArrayInputsParameter[erasedIdx] =
          resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen);
      adjustedInputOffsets[erasedIdx] = 0; // Always 0 for such temp input
    }

    // Prepare for adjustedOutputsParameter
    for (int i = 0; i < adjustedByteArrayOutputsParameter.length; i++) {
      adjustedByteArrayOutputsParameter[i] =
          resetBuffer(bytesArrayBuffers[bufferIdx++], 0, dataLen);
      adjustedOutputOffsets[i] = 0; // Always 0 for such temp output
    }
    for (int outputIdx = 0, i = 0;
         i < erasedIndexes.length; i++, outputIdx++) {
      for (int j = 0; j < erasedOrNotToReadIndexes.length; j++) {
        // If this index is one requested by the caller via erasedIndexes, then
        // we use the passed output buffer to avoid copying data thereafter.
        if (erasedIndexes[i] == erasedOrNotToReadIndexes[j]) {
          adjustedByteArrayOutputsParameter[j] =
              resetBuffer(outputs[outputIdx], 0, dataLen);
          adjustedOutputOffsets[j] = outputOffsets[outputIdx];
        }
      }
    }

    doDecodeImpl(adjustedByteArrayInputsParameter, adjustedInputOffsets,
        dataLen, erasedOrNotToReadIndexes,
        adjustedByteArrayOutputsParameter, adjustedOutputOffsets);
  }
{code}
I think we can use the inputs/outputs directly, no need for copy, we just need to do some modification for {{RSUtil.GF}}. Maybe we can discuss offline.

{code}
TODO: HADOOP-11871
 * currently this implementation will compute and decode not to read
 * units unnecessarily due to the underlying implementation limit in GF.
{code}
It's easy to implement it in this JIRA

*In test*
I'd like to see we have following test (here, I only use 6+3 for example, in real tests, you can cover different schema.):
1. all missed chunks are data chunks  (we may have at most 3 chunks missed, here we need to test 1 chunk missed, 2 chunks missed, 3 chunks missed)
2. all missed chunks are parity chunks. (same as above)
3. both data and parity chunks are missed. (same as above)


> Enhance raw coder allowing to read least required inputs in decoding
> --------------------------------------------------------------------
>
>                 Key: HADOOP-11847
>                 URL: https://issues.apache.org/jira/browse/HADOOP-11847
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: io
>            Reporter: Kai Zheng
>            Assignee: Kai Zheng
>              Labels: BB2015-05-TBR
>         Attachments: HADOOP-11847-HDFS-7285-v3.patch, HADOOP-11847-HDFS-7285-v4.patch, HADOOP-11847-HDFS-7285-v5.patch, HADOOP-11847-v1.patch, HADOOP-11847-v2.patch
>
>
> This is to enhance raw erasure coder to allow only reading least required inputs while decoding. It will also refine and document the relevant APIs for better understanding and usage. When using least required inputs, it may add computating overhead but will possiblly outperform overall since less network traffic and disk IO are involved.
> This is something planned to do but just got reminded by [~zhz]' s question raised in HDFS-7678, also copied here:
> bq.Kai Zheng I have a question about decoding: in a (6+3) schema, if block #2 is missing, and I want to repair it with blocks 0, 1, 3, 4, 5, 8, how should I construct the inputs to RawErasureDecoder#decode?
> With this work, hopefully the answer to above question would be obvious.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)